diff --git a/Cargo.lock b/Cargo.lock index 301a89331dc..63f5a28aea9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8209,6 +8209,7 @@ name = "spacetimedb-fs-utils" version = "2.1.0" dependencies = [ "anyhow", + "chrono", "fs2", "hex", "rand 0.9.2", diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 226c2700d08..4ba103979ec 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -82,7 +82,7 @@ pub enum DatabaseError { } impl From for DatabaseError { - fn from(LockError { path, source }: LockError) -> Self { + fn from(LockError { path, source, .. }: LockError) -> Self { Self::DatabasedOpened(path, source.into()) } } diff --git a/crates/fs-utils/Cargo.toml b/crates/fs-utils/Cargo.toml index 56ebd4f8c32..3b69c08b104 100644 --- a/crates/fs-utils/Cargo.toml +++ b/crates/fs-utils/Cargo.toml @@ -8,6 +8,7 @@ description = "Assorted utilities for filesystem operations used in SpacetimeDB" [dependencies] anyhow.workspace = true +chrono = { workspace = true, features = ["alloc", "std"] } fs2.workspace = true hex.workspace = true rand.workspace = true diff --git a/crates/fs-utils/src/lockfile.rs b/crates/fs-utils/src/lockfile.rs index a874be6e151..cd1e058bd3f 100644 --- a/crates/fs-utils/src/lockfile.rs +++ b/crates/fs-utils/src/lockfile.rs @@ -91,24 +91,41 @@ impl Drop for Lockfile { } pub mod advisory { + use chrono::{DateTime, Utc}; use std::{ + error::Error as StdError, fmt, - fs::{self, File}, - io, + fs::File, + io::{self, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, + process, + time::SystemTime, }; - use fs2::FileExt as _; - use thiserror::Error; - use crate::create_parent_dir; + use fs2::FileExt as _; - #[derive(Debug, Error)] - #[error("failed to lock {}", path.display())] + #[derive(Debug)] pub struct LockError { pub path: PathBuf, - #[source] pub source: io::Error, + pub existing_contents: Option, + } + + impl fmt::Display for LockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "failed to lock {}", self.path.display())?; + if let Some(contents) = &self.existing_contents { + write!(f, " (existing contents: {:?})", contents)?; + } + Ok(()) + } + } + + impl StdError for LockError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + Some(&self.source) + } } /// A file locked with an exclusive, filesystem-level lock. @@ -139,30 +156,94 @@ pub mod advisory { /// created. pub fn lock(path: impl AsRef) -> Result { let path = path.as_ref(); - Self::lock_inner(path).map_err(|source| LockError { - path: path.into(), - source, - }) + Self::lock_inner(path) + } + + /// Replace the lock file contents with `metadata` while holding the lock. + pub fn write_metadata(&mut self, metadata: impl AsRef<[u8]>) -> io::Result<()> { + self.lock.set_len(0)?; + self.lock.seek(SeekFrom::Start(0))?; + self.lock.write_all(metadata.as_ref())?; + self.lock.sync_data()?; + Ok(()) } - fn lock_inner(path: &Path) -> io::Result { - create_parent_dir(path)?; - let lock = File::create(path)?; + fn lock_inner(path: &Path) -> Result { + create_parent_dir(path).map_err(|source| LockError { + path: path.to_path_buf(), + source, + existing_contents: None, + })?; + // This will create the file if it doesn't already exist. + let mut lock = File::options() + .read(true) + .write(true) + .create(true) + .open(path) + .map_err(|source| LockError { + path: path.to_path_buf(), + source, + existing_contents: None, + })?; // TODO: Use `File::lock` (available since rust 1.89) instead? - lock.try_lock_exclusive()?; + if let Err(source) = lock.try_lock_exclusive() { + let existing_contents = if source.kind() == io::ErrorKind::WouldBlock { + Self::read_existing_contents(&mut lock).ok().flatten() + } else { + None + }; + return Err(LockError { + path: path.to_path_buf(), + source, + existing_contents, + }); + } + // Now that we own the lock, clear any content that may have been written by a previous holder. + lock.set_len(0).map_err(|source| LockError { + path: path.to_path_buf(), + source, + existing_contents: None, + })?; + lock.seek(SeekFrom::Start(0)).map_err(|source| LockError { + path: path.to_path_buf(), + source, + existing_contents: None, + })?; - Ok(Self { + let mut locked = Self { path: path.to_path_buf(), lock, - }) + }; + // Write the default metadata. + locked + .write_metadata(Self::default_metadata()) + .map_err(|source| LockError { + path: path.to_path_buf(), + source, + existing_contents: None, + })?; + + Ok(locked) } - /// Release the lock and optionally remove the locked file. - pub fn release(self, remove: bool) -> io::Result<()> { - if remove { - fs::remove_file(&self.path)?; + fn read_existing_contents(lock: &mut File) -> io::Result> { + lock.seek(SeekFrom::Start(0))?; + let mut bytes = Vec::new(); + lock.read_to_end(&mut bytes)?; + if bytes.is_empty() { + return Ok(None); } - Ok(()) + Ok(Some(String::from_utf8_lossy(&bytes).into_owned())) + } + + // Default contents of a lockfile, which has the pid and timestamp. + fn default_metadata() -> String { + let timestamp_ms = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + let timestamp = DateTime::::from_timestamp_millis(timestamp_ms).unwrap_or(DateTime::::UNIX_EPOCH); + format!("pid={};timestamp_utc={}", process::id(), timestamp.to_rfc3339()) } } @@ -172,3 +253,79 @@ pub mod advisory { } } } + +#[cfg(test)] +mod tests { + use std::{fs, io::ErrorKind}; + + use tempdir::TempDir; + + use super::advisory::LockedFile; + + #[test] + fn lockedfile_can_create_a_file() { + let tmp = TempDir::new("lockfile_test").unwrap(); + let path = tmp.path().join("db.lock"); + let _lock1 = LockedFile::lock(&path).unwrap(); + assert!(path.exists()); + let contents = fs::read_to_string(&path).unwrap(); + assert!(contents.contains(&format!("pid={}", std::process::id()))); + assert!(contents.contains("timestamp_utc=")); + } + + #[test] + fn lockedfile_can_create_a_directory_file() { + let tmp = TempDir::new("lockfile_test").unwrap(); + let path = tmp.path().join("new_dir").join("db.lock"); + let _lock1 = LockedFile::lock(&path).unwrap(); + assert!(path.exists()); + } + + #[test] + fn only_one_exclusive_lock_can_be_held() { + let tmp = TempDir::new("lockfile_test").unwrap(); + let path = tmp.path().join("db.lock"); + let _lock1 = LockedFile::lock(&path).unwrap(); + + assert!(LockedFile::lock(&path).is_err()); + } + + #[test] + fn lockedfile_can_handle_existing_file() { + let tmp = TempDir::new("locked_file_test").unwrap(); + let path = tmp.path().join("db.lock"); + let original = b"existing lock metadata"; + fs::write(&path, original).unwrap(); + + let _lock = LockedFile::lock(&path).unwrap(); + + // Previous metadata should be replaced when we acquire the lock. + let contents = fs::read_to_string(&path).unwrap(); + assert!(contents.contains(&format!("pid={}", std::process::id()))); + assert!(contents.contains("timestamp_utc=")); + } + + #[test] + fn lockedfile_can_store_metadata() { + let tmp = TempDir::new("locked_file_test").unwrap(); + let path = tmp.path().join("db.lock"); + let mut lock = LockedFile::lock(&path).unwrap(); + + lock.write_metadata("pid=1234").unwrap(); + + assert_eq!(fs::read_to_string(&path).unwrap(), "pid=1234"); + } + + #[test] + fn lock_error_includes_existing_contents_when_already_locked() { + let tmp = TempDir::new("locked_file_test").unwrap(); + let path = tmp.path().join("db.lock"); + let mut lock = LockedFile::lock(&path).unwrap(); + lock.write_metadata("pid=1234").unwrap(); + + let err = LockedFile::lock(&path).unwrap_err(); + assert_eq!(err.source.kind(), ErrorKind::WouldBlock); + assert_eq!(err.existing_contents.as_deref(), Some("pid=1234")); + assert!(err.to_string().contains("pid=1234")); + } +}