Skip to content

Commit 9cf8375

Browse files
committed
Use async kv store with OutputSweeper
1 parent d4bea6a commit 9cf8375

File tree

3 files changed

+376
-111
lines changed

3 files changed

+376
-111
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43+
use lightning::util::persist::KVStore;
4344
use lightning::util::persist::KVStoreSync;
4445
use lightning::util::persist::Persister;
4546
use lightning::util::persist::PersisterSync;
4647
use lightning::util::sweep::OutputSweeper;
4748
#[cfg(feature = "std")]
4849
use lightning::util::sweep::OutputSweeperSync;
50+
use lightning::util::sweep::OutputSweeperSyncKVStore;
4951
#[cfg(feature = "std")]
5052
use lightning::util::wakers::Sleeper;
5153
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -698,7 +700,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
698700
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
699701
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
700702
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
701-
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
703+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
702704
///
703705
/// # struct Node<
704706
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
@@ -842,7 +844,7 @@ where
842844
LM::Target: ALiquidityManager,
843845
O::Target: 'static + OutputSpender,
844846
D::Target: 'static + ChangeDestinationSource,
845-
K::Target: 'static + KVStoreSync,
847+
K::Target: 'static + KVStore,
846848
{
847849
let mut should_break = false;
848850
let async_event_handler = |event| {
@@ -1018,7 +1020,7 @@ pub async fn process_events_async<
10181020
D: 'static + Deref,
10191021
O: 'static + Deref,
10201022
K: 'static + Deref,
1021-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1023+
OS: 'static + Deref<Target = OutputSweeperSyncKVStore<T, D, F, CF, K, L, O>>,
10221024
S: 'static + Deref<Target = SC> + Send + Sync,
10231025
SC: for<'b> WriteableScore<'b>,
10241026
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1048,6 +1050,7 @@ where
10481050
K::Target: 'static + KVStoreSync,
10491051
{
10501052
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1053+
let sweeper = sweeper.map(|s| s.sweeper_async());
10511054
process_events_full_async(
10521055
persister,
10531056
event_handler,
@@ -1300,6 +1303,7 @@ impl Drop for BackgroundProcessor {
13001303
#[cfg(all(feature = "std", test))]
13011304
mod tests {
13021305
use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1306+
use crate::PersisterSyncWrapper;
13031307
use bitcoin::constants::{genesis_block, ChainHash};
13041308
use bitcoin::hashes::Hash;
13051309
use bitcoin::locktime::absolute::LockTime;
@@ -2253,11 +2257,12 @@ mod tests {
22532257
open_channel!(nodes[0], nodes[1], 100000);
22542258

22552259
let data_dir = nodes[0].kv_store.get_data_dir();
2256-
let persister = Arc::new(
2260+
let persister_sync = Arc::new(
22572261
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
22582262
);
2263+
let persister = PersisterSyncWrapper::new(persister_sync);
22592264

2260-
let bp_future = super::process_events_async(
2265+
let bp_future = super::process_events_full_async(
22612266
persister,
22622267
|_: _| async { Ok(()) },
22632268
Arc::clone(&nodes[0].chain_monitor),
@@ -2764,11 +2769,12 @@ mod tests {
27642769
let (_, nodes) =
27652770
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
27662771
let data_dir = nodes[0].kv_store.get_data_dir();
2767-
let persister =
2772+
let persister_sync =
27682773
Arc::new(PersisterSync::new(data_dir).with_graph_persistence_notifier(sender));
2774+
let persister = PersisterSyncWrapper::new(persister_sync);
27692775

27702776
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2771-
let bp_future = super::process_events_async(
2777+
let bp_future = super::process_events_full_async(
27722778
persister,
27732779
|_: _| async { Ok(()) },
27742780
Arc::clone(&nodes[0].chain_monitor),
@@ -2981,11 +2987,12 @@ mod tests {
29812987

29822988
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
29832989
let data_dir = nodes[0].kv_store.get_data_dir();
2984-
let persister = Arc::new(PersisterSync::new(data_dir));
2990+
let persister_sync = Arc::new(PersisterSync::new(data_dir));
2991+
let persister = PersisterSyncWrapper::new(persister_sync);
29852992

29862993
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
29872994

2988-
let bp_future = super::process_events_async(
2995+
let bp_future = super::process_events_full_async(
29892996
persister,
29902997
event_handler,
29912998
Arc::clone(&nodes[0].chain_monitor),

lightning/src/util/persist.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,58 @@ pub trait KVStoreSync {
121121
) -> Result<Vec<String>, io::Error>;
122122
}
123123

124+
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait.
125+
pub struct KVStoreSyncWrapper<K: Deref>(pub K)
126+
where
127+
K::Target: KVStoreSync;
128+
129+
impl<K: Deref> Deref for KVStoreSyncWrapper<K>
130+
where
131+
K::Target: KVStoreSync,
132+
{
133+
type Target = Self;
134+
fn deref(&self) -> &Self {
135+
self
136+
}
137+
}
138+
139+
impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
140+
where
141+
K::Target: KVStoreSync,
142+
{
143+
fn read(
144+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
145+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> {
146+
let res = self.0.read(primary_namespace, secondary_namespace, key);
147+
148+
Box::pin(async move { res })
149+
}
150+
151+
fn write(
152+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
153+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
154+
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
155+
156+
Box::pin(async move { res })
157+
}
158+
159+
fn remove(
160+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
161+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
162+
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
163+
164+
Box::pin(async move { res })
165+
}
166+
167+
fn list(
168+
&self, primary_namespace: &str, secondary_namespace: &str,
169+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> {
170+
let res = self.0.list(primary_namespace, secondary_namespace);
171+
172+
Box::pin(async move { res })
173+
}
174+
}
175+
124176
/// A trait that provides a key-value store interface for persisting data.
125177
pub trait KVStore {
126178
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and

0 commit comments

Comments
 (0)