Skip to content

Commit a11f18d

Browse files
committed
Convert process_events_async to take an asynchronous Persister
Also provide a wrapper to allow a sync kvstore to be used.
1 parent caeacf6 commit a11f18d

File tree

2 files changed

+274
-58
lines changed

2 files changed

+274
-58
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 173 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStoreSync, PersisterSync};
43+
#[cfg(feature = "std")]
44+
use lightning::util::persist::KVStoreSync;
45+
use lightning::util::persist::PersisterSync;
46+
use lightning::util::persist::{KVStore, Persister};
4447
use lightning::util::sweep::OutputSweeper;
4548
#[cfg(feature = "std")]
4649
use lightning::util::sweep::OutputSweeperSync;
@@ -50,7 +53,9 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
5053

5154
use lightning_liquidity::ALiquidityManager;
5255

56+
use core::future::Future;
5357
use core::ops::Deref;
58+
use core::pin::Pin;
5459
use core::time::Duration;
5560

5661
#[cfg(feature = "std")]
@@ -311,6 +316,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311316
true
312317
}
313318

319+
macro_rules! maybe_await {
320+
(true, $e:expr) => {
321+
$e.await
322+
};
323+
(false, $e:expr) => {
324+
$e
325+
};
326+
}
327+
314328
macro_rules! define_run_body {
315329
(
316330
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +333,7 @@ macro_rules! define_run_body {
319333
$peer_manager: ident, $gossip_sync: ident,
320334
$process_sweeper: expr,
321335
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
336+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323337
) => { {
324338
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325339
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +389,7 @@ macro_rules! define_run_body {
375389

376390
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377391
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
392+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
379393
log_trace!($logger, "Done persisting ChannelManager.");
380394
}
381395
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +450,7 @@ macro_rules! define_run_body {
436450
log_trace!($logger, "Persisting network graph.");
437451
}
438452

439-
if let Err(e) = $persister.persist_graph(network_graph) {
453+
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
440454
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441455
}
442456

@@ -464,7 +478,7 @@ macro_rules! define_run_body {
464478
} else {
465479
log_trace!($logger, "Persisting scorer");
466480
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
481+
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
468482
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469483
}
470484
}
@@ -487,16 +501,16 @@ macro_rules! define_run_body {
487501
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488502
// some races where users quit while channel updates were in-flight, with
489503
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
504+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
491505

492506
// Persist Scorer on exit
493507
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
508+
maybe_await!($async, $persister.persist_scorer(&scorer))?;
495509
}
496510

497511
// Persist NetworkGraph on exit
498512
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
513+
maybe_await!($async, $persister.persist_graph(network_graph))?;
500514
}
501515

502516
Ok(())
@@ -643,22 +657,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643657
/// ```
644658
/// # use lightning::io;
645659
/// # use lightning::events::ReplayEvent;
646-
/// # use lightning::util::sweep::OutputSweeper;
647660
/// # use std::sync::{Arc, RwLock};
648661
/// # use std::sync::atomic::{AtomicBool, Ordering};
649662
/// # use std::time::SystemTime;
650-
/// # use lightning_background_processor::{process_events_async, GossipSync};
663+
/// # use lightning_background_processor::{process_events_full_async, GossipSync};
664+
/// # use core::future::Future;
665+
/// # use core::pin::Pin;
651666
/// # struct Logger {}
652667
/// # impl lightning::util::logger::Logger for Logger {
653668
/// # fn log(&self, _record: lightning::util::logger::Record) {}
654669
/// # }
655-
/// # struct Store {}
656-
/// # impl lightning::util::persist::KVStore for Store {
670+
/// # struct StoreSync {}
671+
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
657672
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658673
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659674
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660675
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661676
/// # }
677+
/// # struct Store {}
678+
/// # impl lightning::util::persist::KVStore for Store {
679+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
680+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
681+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
682+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
683+
/// # }
662684
/// # struct EventHandler {}
663685
/// # impl EventHandler {
664686
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +691,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669691
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670692
/// # fn disconnect_socket(&mut self) {}
671693
/// # }
672-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>, Arc<lightning::sign::KeysManager>>;
694+
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
673695
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674696
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675697
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676698
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677699
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678700
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679-
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680-
/// #
701+
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
702+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
703+
///
681704
/// # struct Node<
682705
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683706
/// # F: lightning::chain::Filter + Send + Sync + 'static,
684707
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685708
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686709
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688710
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689711
/// # > {
690712
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +719,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697719
/// # persister: Arc<Store>,
698720
/// # logger: Arc<Logger>,
699721
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
722+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
701723
/// # }
702724
/// #
703725
/// # async fn setup_background_processing<
@@ -706,10 +728,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706728
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707729
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708730
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710731
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
712-
/// let background_persister = Arc::clone(&node.persister);
732+
/// # >(node: Node<B, F, FE, UL, D, O>) {
733+
/// let background_persister = Arc::new(Arc::clone(&node.persister));
713734
/// let background_event_handler = Arc::clone(&node.event_handler);
714735
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715736
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -744,7 +765,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
744765
doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
745766
)]
746767
#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
747-
/// process_events_async(
768+
/// process_events_full_async(
748769
/// background_persister,
749770
/// |e| background_event_handler.handle_event(e),
750771
/// background_chain_mon,
@@ -769,7 +790,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
769790
#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
770791
/// # }
771792
///```
772-
pub async fn process_events_async<
793+
pub async fn process_events_full_async<
773794
'a,
774795
UL: 'static + Deref,
775796
CF: 'static + Deref,
@@ -814,7 +835,7 @@ where
814835
F::Target: 'static + FeeEstimator,
815836
L::Target: 'static + Logger,
816837
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
838+
PS::Target: 'static + Persister<'a, CM, L, S>,
818839
ES::Target: 'static + EntropySource,
819840
CM::Target: AChannelManager,
820841
OM::Target: AOnionMessenger,
@@ -841,7 +862,7 @@ where
841862
if let Some(duration_since_epoch) = fetch_time() {
842863
if update_scorer(scorer, &event, duration_since_epoch) {
843864
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
865+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845866
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846867
// We opt not to abort early on persistence failure here as persisting
847868
// the scorer is non-critical and we still hope that it will have
@@ -919,7 +940,134 @@ where
919940
},
920941
mobile_interruptable_platform,
921942
fetch_time,
943+
true,
944+
)
945+
}
946+
947+
/// A wrapper that turns a synchronous [`PersisterSync`] into an async [`Persister`].
948+
struct PersisterSyncWrapper<'a, PS, CM, L, S> {
949+
inner: PS,
950+
_phantom: core::marker::PhantomData<(&'a (), CM, L, S)>,
951+
}
952+
953+
impl<'a, PS, CM, L, S> Deref for PersisterSyncWrapper<'_, PS, CM, L, S> {
954+
type Target = Self;
955+
fn deref(&self) -> &Self {
956+
self
957+
}
958+
}
959+
960+
impl<'a, PS, CM, L, S> PersisterSyncWrapper<'a, PS, CM, L, S> {
961+
/// Constructs a new [`PersisterSyncWrapper`] from the given sync persister.
962+
pub fn new(inner: PS) -> Self {
963+
Self { inner, _phantom: core::marker::PhantomData }
964+
}
965+
}
966+
967+
impl<'a, PS: Deref, CM: Deref + 'static, L: Deref + 'static, S: Deref + 'static>
968+
Persister<'a, CM, L, S> for PersisterSyncWrapper<'a, PS, CM, L, S>
969+
where
970+
PS::Target: PersisterSync<'a, CM, L, S>,
971+
CM::Target: AChannelManager,
972+
L::Target: Logger,
973+
S::Target: WriteableScore<'a>,
974+
{
975+
fn persist_manager(
976+
&self, channel_manager: &CM,
977+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
978+
let res = self.inner.persist_manager(&channel_manager);
979+
Box::pin(async move { res })
980+
}
981+
982+
fn persist_graph(
983+
&self, network_graph: &NetworkGraph<L>,
984+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
985+
let res = self.inner.persist_graph(&network_graph);
986+
Box::pin(async move { res })
987+
}
988+
989+
fn persist_scorer(
990+
&self, scorer: &S,
991+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
992+
let res = self.inner.persist_scorer(&scorer);
993+
Box::pin(async move { res })
994+
}
995+
}
996+
997+
/// Async events processor the is based on [`process_events_async`] but allows for [`PersisterSync`] to be used for
998+
/// synchronous background persistence.
999+
pub async fn process_events_async<
1000+
UL: 'static + Deref,
1001+
CF: 'static + Deref,
1002+
T: 'static + Deref,
1003+
F: 'static + Deref,
1004+
G: 'static + Deref<Target = NetworkGraph<L>>,
1005+
L: 'static + Deref + Send + Sync,
1006+
P: 'static + Deref,
1007+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1008+
EventHandler: Fn(Event) -> EventHandlerFuture,
1009+
PS: 'static + Deref + Send + Sync,
1010+
ES: 'static + Deref + Send,
1011+
M: 'static
1012+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1013+
+ Send
1014+
+ Sync,
1015+
CM: 'static + Deref + Send + Sync,
1016+
OM: 'static + Deref,
1017+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1018+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1019+
PM: 'static + Deref,
1020+
LM: 'static + Deref,
1021+
D: 'static + Deref,
1022+
O: 'static + Deref,
1023+
K: 'static + Deref,
1024+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1025+
S: 'static + Deref<Target = SC> + Send + Sync,
1026+
SC: for<'b> WriteableScore<'b>,
1027+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1028+
Sleeper: Fn(Duration) -> SleepFuture,
1029+
FetchTime: Fn() -> Option<Duration>,
1030+
>(
1031+
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1032+
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1033+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1034+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1035+
) -> Result<(), lightning::io::Error>
1036+
where
1037+
UL::Target: 'static + UtxoLookup,
1038+
CF::Target: 'static + chain::Filter,
1039+
T::Target: 'static + BroadcasterInterface,
1040+
F::Target: 'static + FeeEstimator,
1041+
L::Target: 'static + Logger,
1042+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1043+
PS::Target: 'static + PersisterSync<'static, CM, L, S>,
1044+
ES::Target: 'static + EntropySource,
1045+
CM::Target: AChannelManager,
1046+
OM::Target: AOnionMessenger,
1047+
PM::Target: APeerManager,
1048+
LM::Target: ALiquidityManager,
1049+
O::Target: 'static + OutputSpender,
1050+
D::Target: 'static + ChangeDestinationSource,
1051+
K::Target: 'static + KVStoreSync,
1052+
{
1053+
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1054+
process_events_full_async(
1055+
persister,
1056+
event_handler,
1057+
chain_monitor,
1058+
channel_manager,
1059+
onion_messenger,
1060+
gossip_sync,
1061+
peer_manager,
1062+
liquidity_manager,
1063+
sweeper,
1064+
logger,
1065+
scorer,
1066+
sleeper,
1067+
mobile_interruptable_platform,
1068+
fetch_time,
9221069
)
1070+
.await
9231071
}
9241072

9251073
#[cfg(feature = "std")]
@@ -1098,6 +1246,7 @@ impl BackgroundProcessor {
10981246
.expect("Time should be sometime after 1970"),
10991247
)
11001248
},
1249+
false,
11011250
)
11021251
});
11031252
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

0 commit comments

Comments
 (0)