diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index ec34fa8a88d..5a515fbab22 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -13,9 +13,13 @@ edition = "2021" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[features] +tokio = ["dep:tokio"] + [dependencies] bitcoin = "0.32.2" lightning = { version = "0.2.0", path = "../lightning" } +tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] } [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 5fac0cce617..917551c21cf 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -8,9 +8,18 @@ use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +#[cfg(feature = "tokio")] +use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use core::pin::Pin; +#[cfg(feature = "tokio")] +use lightning::util::persist::KVStore; + #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -30,19 +39,29 @@ fn path_to_windows_str>(path: &T) -> Vec { path.as_ref().encode_wide().chain(Some(0)).collect() } -// The number of read/write/remove/list operations after which we clean up our `locks` HashMap. -const GC_LOCK_INTERVAL: usize = 25; - // The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; -/// A [`KVStoreSync`] implementation that writes to and reads from the file system. -pub struct FilesystemStore { +struct FilesystemStoreInner { data_dir: PathBuf, tmp_file_counter: AtomicUsize, - gc_counter: AtomicUsize, - locks: Mutex>>>, + + // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the + // latest written version per key. + locks: Mutex>>>, +} + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStore { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove + // operations aren't sensitive to the order of execution. + #[cfg(feature = "tokio")] + version_counter: AtomicU64, } impl FilesystemStore { @@ -50,27 +69,46 @@ impl FilesystemStore { pub fn new(data_dir: PathBuf) -> Self { let locks = Mutex::new(HashMap::new()); let tmp_file_counter = AtomicUsize::new(0); - let gc_counter = AtomicUsize::new(1); - Self { data_dir, tmp_file_counter, gc_counter, locks } + Self { + inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), + #[cfg(feature = "tokio")] + version_counter: AtomicU64::new(0), + } } /// Returns the data directory. pub fn get_data_dir(&self) -> PathBuf { - self.data_dir.clone() + self.inner.data_dir.clone() + } +} + +impl KVStoreSync for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + self.inner.read(primary_namespace, secondary_namespace, key) } - fn garbage_collect_locks(&self) { - let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel); + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), lightning::io::Error> { + self.inner.write_version(primary_namespace, secondary_namespace, key, buf, None) + } - if gc_counter % GC_LOCK_INTERVAL == 0 { - // Take outer lock for the cleanup. - let mut outer_lock = self.locks.lock().unwrap(); + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + self.inner.remove(primary_namespace, secondary_namespace, key, lazy) + } - // Garbage collect all lock entries that are not referenced anymore. - outer_lock.retain(|_, v| Arc::strong_count(&v) > 1); - } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + self.inner.list(primary_namespace, secondary_namespace) } +} +impl FilesystemStoreInner { fn get_dest_dir_path( &self, primary_namespace: &str, secondary_namespace: &str, ) -> std::io::Result { @@ -94,9 +132,7 @@ impl FilesystemStore { Ok(dest_dir_path) } -} -impl KVStoreSync for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { @@ -117,13 +153,14 @@ impl KVStoreSync for FilesystemStore { f.read_to_end(&mut buf)?; } - self.garbage_collect_locks(); - Ok(buf) } - fn write( + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + fn write_version( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + version: Option, ) -> lightning::io::Result<()> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; @@ -157,7 +194,18 @@ impl KVStoreSync for FilesystemStore { let mut outer_lock = self.locks.lock().unwrap(); Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) }; - let _guard = inner_lock_ref.write().unwrap(); + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // If a version is provided, we check if we already have a newer version written. This is used in async + // contexts to realize eventual consistency. + if let Some(version) = version { + if version <= *last_written_version { + // If the version is not greater, we don't write the file. + return Ok(()); + } + + *last_written_version = version; + } #[cfg(not(target_os = "windows"))] { @@ -204,8 +252,6 @@ impl KVStoreSync for FilesystemStore { } }; - self.garbage_collect_locks(); - res } @@ -299,8 +345,6 @@ impl KVStoreSync for FilesystemStore { } } - self.garbage_collect_locks(); - Ok(()) } @@ -351,12 +395,93 @@ impl KVStoreSync for FilesystemStore { break 'retry_list; } - self.garbage_collect_locks(); - Ok(keys) } } +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let this = Arc::clone(&self.inner); + + Box::pin(async move { + tokio::task::spawn_blocking(move || { + this.read(&primary_namespace, &secondary_namespace, &key) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Pin> + 'static + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let buf = buf.to_vec(); + let this = Arc::clone(&self.inner); + + // Obtain a version number to retain the call sequence. + let version = self.version_counter.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStore version counter overflowed"); + } + + Box::pin(async move { + tokio::task::spawn_blocking(move || { + this.write_version( + &primary_namespace, + &secondary_namespace, + &key, + &buf, + Some(version), + ) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + 'static + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let this = Arc::clone(&self.inner); + + Box::pin(async move { + tokio::task::spawn_blocking(move || { + this.remove(&primary_namespace, &secondary_namespace, &key, lazy) + }) + .await + .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let this = Arc::clone(&self.inner); + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace)) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); if let Some(ext) = p.extension() { @@ -447,7 +572,7 @@ fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result Result, lightning::io::Error> { - let prefixed_dest = &self.data_dir; + let prefixed_dest = &self.inner.data_dir; if !prefixed_dest.exists() { return Ok(Vec::new()); } @@ -534,7 +659,7 @@ mod tests { fn drop(&mut self) { // We test for invalid directory names, so it's OK if directory removal // fails. - match fs::remove_dir_all(&self.data_dir) { + match fs::remove_dir_all(&self.inner.data_dir) { Err(e) => println!("Failed to remove test persister directory: {}", e), _ => {}, } @@ -549,6 +674,48 @@ mod tests { do_read_write_remove_list_persist(&fs_store); } + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async"); + let fs_store: Arc = Arc::new(FilesystemStore::new(temp_path)); + + let data1 = [42u8; 32]; + let data2 = [43u8; 32]; + + let primary_namespace = "testspace"; + let secondary_namespace = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, &data1); + let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, &data2); + + fut2.await.unwrap(); + fut1.await.unwrap(); + + // Test list. + let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap(); + + let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir();