Skip to content

Async FilesystemStore #3931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 202 additions & 56 deletions lightning-background-processor/src/lib.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions lightning-persister/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ rustdoc-args = ["--cfg", "docsrs"]
bitcoin = "0.32.2"
lightning = { version = "0.2.0", path = "../lightning" }

# TODO: Make conditional?
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time", "fs", "io-util" ] }

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

Expand Down
110 changes: 54 additions & 56 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};

use lightning::types::string::PrintableString;
use lightning::util::persist::{KVStore, MigratableKVStore};
use lightning::util::persist::{KVStoreSync, MigratableKVStore};

use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -30,43 +30,29 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
path.as_ref().encode_wide().chain(Some(0)).collect()
}

// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
const GC_LOCK_INTERVAL: usize = 25;

/// A [`KVStore`] implementation that writes to and reads from the file system.
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
pub struct FilesystemStore {
data_dir: PathBuf,
tmp_file_counter: AtomicUsize,
gc_counter: AtomicUsize,
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,

// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
// latest written version per key.
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
}

impl FilesystemStore {
/// Constructs a new [`FilesystemStore`].
pub fn new(data_dir: PathBuf) -> Self {
let locks = Mutex::new(HashMap::new());
let tmp_file_counter = AtomicUsize::new(0);
let gc_counter = AtomicUsize::new(1);
Self { data_dir, tmp_file_counter, gc_counter, locks }
Self { data_dir, tmp_file_counter, locks }
}

/// Returns the data directory.
pub fn get_data_dir(&self) -> PathBuf {
self.data_dir.clone()
}

fn garbage_collect_locks(&self) {
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);

if gc_counter % GC_LOCK_INTERVAL == 0 {
// Take outer lock for the cleanup.
let mut outer_lock = self.locks.lock().unwrap();

// Garbage collect all lock entries that are not referenced anymore.
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
}
}

fn get_dest_dir_path(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> std::io::Result<PathBuf> {
Expand All @@ -90,36 +76,12 @@ impl FilesystemStore {

Ok(dest_dir_path)
}
}

impl KVStore for FilesystemStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;

let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
dest_file_path.push(key);

let mut buf = Vec::new();
{
let inner_lock_ref = {
let mut outer_lock = self.locks.lock().unwrap();
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
};
let _guard = inner_lock_ref.read().unwrap();

let mut f = fs::File::open(dest_file_path)?;
f.read_to_end(&mut buf)?;
}

self.garbage_collect_locks();

Ok(buf)
}

fn write(
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
/// returns early without writing.
pub(crate) fn write_version(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
version: Option<u64>,
) -> lightning::io::Result<()> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;

Expand Down Expand Up @@ -153,7 +115,18 @@ impl KVStore for FilesystemStore {
let mut outer_lock = self.locks.lock().unwrap();
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
};
let _guard = inner_lock_ref.write().unwrap();
let mut last_written_version = inner_lock_ref.write().unwrap();

// If a version is provided, we check if we already have a newer version written. This is used in async
// contexts to realize eventual consistency.
if let Some(version) = version {
if version <= *last_written_version {
// If the version is not greater, we don't write the file.
return Ok(());
}

*last_written_version = version;
}

#[cfg(not(target_os = "windows"))]
{
Expand Down Expand Up @@ -200,10 +173,39 @@ impl KVStore for FilesystemStore {
}
};

self.garbage_collect_locks();

res
}
}

impl KVStoreSync for FilesystemStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;

let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
dest_file_path.push(key);

let mut buf = Vec::new();
{
let inner_lock_ref = {
let mut outer_lock = self.locks.lock().unwrap();
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
};
let _guard = inner_lock_ref.read().unwrap();

let mut f = fs::File::open(dest_file_path)?;
f.read_to_end(&mut buf)?;
}

Ok(buf)
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> lightning::io::Result<()> {
self.write_version(primary_namespace, secondary_namespace, key, buf, None)
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
Expand Down Expand Up @@ -295,8 +297,6 @@ impl KVStore for FilesystemStore {
}
}

self.garbage_collect_locks();

Ok(())
}

Expand Down Expand Up @@ -325,8 +325,6 @@ impl KVStore for FilesystemStore {
keys.push(key);
}

self.garbage_collect_locks();

Ok(keys)
}
}
Expand Down
155 changes: 155 additions & 0 deletions lightning-persister/src/fs_store_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! Objects related to [`FilesystemStoreAsync`] live here.

use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use crate::fs_store::FilesystemStore;
use core::future::Future;
use core::pin::Pin;
use lightning::util::persist::{KVStore, KVStoreSync};

/// An asynchronous extension of FilesystemStore, implementing the `KVStore` trait for async operations. It is shaped as
/// a wrapper around an existing [`FilesystemStore`] so that the same locks are used. This allows both the sync and
/// async interface to be used simultaneously.
pub struct FilesystemStoreAsync {
inner: Arc<FilesystemStore>,

// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
// operations aren't sensitive to the order of execution.
version_counter: AtomicU64,
}

impl FilesystemStoreAsync {
/// Creates a new instance of [`FilesystemStoreAsync`].
pub fn new(inner: Arc<FilesystemStore>) -> Self {
Self { inner, version_counter: AtomicU64::new(0) }
}
}

impl KVStore for FilesystemStoreAsync {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
let this = Arc::clone(&self.inner);

Box::pin(async move {
tokio::task::spawn_blocking(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible."). Maybe there are other designs that we should at least consider before moving forward with this approach. For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?

If we use spawn_blocking, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhh, so I'm not sure if spawning blocking tasks for every IO call is the way to go (see for example https://docs.rs/tokio/latest/tokio/fs/index.html#tuning-your-file-io: "To get good performance with file IO on Tokio, it is recommended to batch your operations into as few spawn_blocking calls as possible.").

If we should batch operations, I think the current approach is better than using tokio::fs? Because it already batches the various operations inside kvstoresync::write.

Further batching probably needs to happen at a higher level in LDK, and might be a bigger change. Not sure if that is worth it just for FIlesystemStore, especially when that store is not the preferred store for real world usage?

For example, we could create a dedicated pool of longer-lived worker task(s) that process a queue?

Isn't Tokio doing that already when a task is spawned?

If we use spawn_blocking, can we give the user control over which runtime this exactly will be spawned on? Also, rather than just doing wrapping, should we be using tokio::fs?

With tokio::fs, the current runtime is used. I'd think that that is then also sufficient if we spawn ourselves, without a need to specifiy which runtime exactly?

More generally, I think the main purpose of this PR is to show how an async kvstore could be implemented, and to have something for testing potentially. Additionally if there are users that really want to use this type of store in production, they could. But I don't think it is something to spend too much time on. A remote database is probably the more important target to design for.

this.read(&primary_namespace, &secondary_namespace, &key)
})
.await
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
})
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
let buf = buf.to_vec();
let this = Arc::clone(&self.inner);

// Obtain a version number to retain the call sequence.
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);

Box::pin(async move {
tokio::task::spawn_blocking(move || {
this.write_version(
&primary_namespace,
&secondary_namespace,
&key,
&buf,
Some(version),
)
})
.await
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
})
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
let this = Arc::clone(&self.inner);

Box::pin(async move {
tokio::task::spawn_blocking(move || {
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
})
.await
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
})
}

fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
let primary_namespace = primary_namespace.to_string();
let secondary_namespace = secondary_namespace.to_string();
let this = Arc::clone(&self.inner);

Box::pin(async move {
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
.await
.unwrap_or_else(|e| {
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
})
})
}
}

mod test {
use crate::{fs_store::FilesystemStore, fs_store_async::FilesystemStoreAsync};
use lightning::util::persist::KVStore;
use std::sync::Arc;

#[tokio::test]
async fn read_write_remove_list_persist() {
let mut temp_path = std::env::temp_dir();
temp_path.push("test_read_write_remove_list_persist");
let fs_store = Arc::new(FilesystemStore::new(temp_path));
let fs_store_async = FilesystemStoreAsync::new(Arc::clone(&fs_store));

let data1 = [42u8; 32];
let data2 = [43u8; 32];

let primary_namespace = "testspace";
let secondary_namespace = "testsubspace";
let key = "testkey";

// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
// that eventual consistency works.
let fut1 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data1);
let fut2 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data2);

fut2.await.unwrap();
fut1.await.unwrap();

// Test list.
let listed_keys =
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
assert_eq!(listed_keys.len(), 1);
assert_eq!(listed_keys[0], key);

// Test read. We expect to read data2, as the write call was initiated later.
let read_data =
fs_store_async.read(primary_namespace, secondary_namespace, key).await.unwrap();
assert_eq!(data2, &*read_data);

// Test remove.
fs_store_async.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();

let listed_keys =
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
assert_eq!(listed_keys.len(), 0);
}
}
1 change: 1 addition & 0 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
extern crate criterion;

pub mod fs_store;
pub mod fs_store_async;

mod utils;

Expand Down
6 changes: 3 additions & 3 deletions lightning-persister/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use lightning::ln::functional_test_utils::{
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
};
use lightning::util::persist::{
migrate_kv_store_data, read_channel_monitors, KVStore, MigratableKVStore,
migrate_kv_store_data, read_channel_monitors, KVStoreSync, MigratableKVStore,
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
};
use lightning::util::test_utils;
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};

use std::panic::RefUnwindSafe;

pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_store: &K) {
pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(kv_store: &K) {
let data = [42u8; 32];

let primary_namespace = "testspace";
Expand Down Expand Up @@ -113,7 +113,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>

// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStore + Sync>(store_0: &K, store_1: &K) {
pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
let chanmon_cfgs = create_chanmon_cfgs(2);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Expand Down
5 changes: 2 additions & 3 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1886,7 +1886,7 @@ where
/// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly
/// every minute
/// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a
/// [`Persister`] such as a [`KVStore`] implementation
/// [`KVStoreSync`] implementation
/// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation
///
/// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining
Expand Down Expand Up @@ -2468,8 +2468,7 @@ where
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
/// [`timer_tick_occurred`]: Self::timer_tick_occurred
/// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence
/// [`Persister`]: crate::util::persist::Persister
/// [`KVStore`]: crate::util::persist::KVStore
/// [`KVStoreSync`]: crate::util::persist::KVStoreSync
/// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future
/// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync
/// [`lightning-transaction-sync`]: https://docs.rs/lightning_transaction_sync/latest/lightning_transaction_sync
Expand Down
Loading
Loading