Skip to content

Commit cce1291

Browse files
committed
Persist ChannelMonitors after new blocks are connected
This resolves several user complaints (and issues in the sample node) where startup is substantially delayed as we're always waiting for the chain data to sync. Further, in an upcoming PR, we'll be reloading pending payments from ChannelMonitors on restart, at which point we'll need the change here which avoids handling events until after the user has confirmed the `ChannelMonitor` has been persisted to disk. It will avoid a race where we * send a payment/HTLC (persisting the monitor to disk with the HTLC pending), * force-close the channel, removing the channel entry from the ChannelManager entirely, * persist the ChannelManager, * connect a block which contains a fulfill of the HTLC, generating a claim event, * handle the claim event while the `ChannelMonitor` is being persisted, * persist the ChannelManager (before the CHannelMonitor is persisted fully), * restart, reloading the HTLC as a pending payment in the ChannelManager, which now has no references to it except from the ChannelMonitor which still has the pending HTLC, * replay the block connection, generating a duplicate PaymentSent event.
1 parent 7aa2cac commit cce1291

File tree

4 files changed

+108
-20
lines changed

4 files changed

+108
-20
lines changed

lightning-persister/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ impl FilesystemPersister {
159159
}
160160

161161
impl<ChannelSigner: Sign> channelmonitor::Persist<ChannelSigner> for FilesystemPersister {
162+
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
163+
// down once these start returning failure.
164+
// A PermanentFailure implies we need to shut down since we're force-closing channels without
165+
// even broadcasting, and sync_persisted_channel's docs are even more explicit that its time to
166+
// shut down!
167+
162168
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
163169
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
164170
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
@@ -170,6 +176,12 @@ impl<ChannelSigner: Sign> channelmonitor::Persist<ChannelSigner> for FilesystemP
170176
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
171177
.map_err(|_| ChannelMonitorUpdateErr::PermanentFailure)
172178
}
179+
180+
fn sync_persisted_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), ()> {
181+
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
182+
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
183+
.map_err(|_| ())
184+
}
173185
}
174186

175187
#[cfg(test)]

lightning/src/chain/chainmonitor.rs

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use util::events::EventHandler;
3939
use ln::channelmanager::ChannelDetails;
4040

4141
use prelude::*;
42-
use sync::RwLock;
42+
use sync::{RwLock, Mutex};
4343
use core::ops::Deref;
4444

4545
/// An implementation of [`chain::Watch`] for monitoring channels.
@@ -60,6 +60,18 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
6060
{
6161
/// The monitors
6262
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
63+
/// Beyond the synchronization of `monitors` itself, we cannot handle user events until after
64+
/// any chain updates have been stored on disk. This mutex is used to provide mutual exclusion
65+
/// of event-processing/block-/transaction-connection.
66+
/// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
67+
/// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
68+
/// the pending payment entry, and then reloading before the monitor is persisted, resulting in
69+
/// the ChannelManager re-adding the same payment entry, before the same block is replayed,
70+
/// resulting in a duplicate PaymentSent event.
71+
///
72+
/// Note that this is set to true if any persistence fails, at which point *no events must be
73+
/// processed* (and the user has indicated they will shut down very very soon).
74+
event_mutex: Mutex<bool>,
6375
chain_source: Option<C>,
6476
broadcaster: T,
6577
logger: L,
@@ -88,26 +100,43 @@ where C::Target: chain::Filter,
88100
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
89101
{
90102
let mut dependent_txdata = Vec::new();
91-
let monitors = self.monitors.read().unwrap();
92-
for monitor in monitors.values() {
93-
let mut txn_outputs = process(monitor, txdata);
103+
{
104+
let monitors = self.monitors.write().unwrap();
105+
for (funding_outpoint, monitor) in monitors.iter() {
106+
let mut txn_outputs;
107+
{
108+
let mut ev_lock = self.event_mutex.lock().unwrap();
109+
txn_outputs = process(monitor, txdata);
110+
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
111+
if let Err(()) = self.persister.sync_persisted_channel(*funding_outpoint, monitor) {
112+
// If we fail to persist a monitor, stop processing events, assuming we'll
113+
// be shutting down soon (and the events can be re-generated on chain
114+
// replay).
115+
*ev_lock = true;
116+
log_error!(self.logger, "Failed to sync Channel Monitor for channel {}!", log_funding_info!(monitor));
117+
log_error!(self.logger, " The LDK-based application should now be shutting down!");
118+
} else {
119+
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor));
120+
}
121+
}
94122

95-
// Register any new outputs with the chain source for filtering, storing any dependent
96-
// transactions from within the block that previously had not been included in txdata.
97-
if let Some(ref chain_source) = self.chain_source {
98-
let block_hash = header.block_hash();
99-
for (txid, mut outputs) in txn_outputs.drain(..) {
100-
for (idx, output) in outputs.drain(..) {
101-
// Register any new outputs with the chain source for filtering and recurse
102-
// if it indicates that there are dependent transactions within the block
103-
// that had not been previously included in txdata.
104-
let output = WatchedOutput {
105-
block_hash: Some(block_hash),
106-
outpoint: OutPoint { txid, index: idx as u16 },
107-
script_pubkey: output.script_pubkey,
108-
};
109-
if let Some(tx) = chain_source.register_output(output) {
110-
dependent_txdata.push(tx);
123+
// Register any new outputs with the chain source for filtering, storing any dependent
124+
// transactions from within the block that previously had not been included in txdata.
125+
if let Some(ref chain_source) = self.chain_source {
126+
let block_hash = header.block_hash();
127+
for (txid, mut outputs) in txn_outputs.drain(..) {
128+
for (idx, output) in outputs.drain(..) {
129+
// Register any new outputs with the chain source for filtering and recurse
130+
// if it indicates that there are dependent transactions within the block
131+
// that had not been previously included in txdata.
132+
let output = WatchedOutput {
133+
block_hash: Some(block_hash),
134+
outpoint: OutPoint { txid, index: idx as u16 },
135+
script_pubkey: output.script_pubkey,
136+
};
137+
if let Some(tx) = chain_source.register_output(output) {
138+
dependent_txdata.push(tx);
139+
}
111140
}
112141
}
113142
}
@@ -133,6 +162,7 @@ where C::Target: chain::Filter,
133162
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
134163
Self {
135164
monitors: RwLock::new(HashMap::new()),
165+
event_mutex: Mutex::new(false),
136166
chain_source,
137167
broadcaster,
138168
logger,
@@ -331,6 +361,13 @@ where C::Target: chain::Filter,
331361
}
332362

333363
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
364+
let ev_lock = self.event_mutex.lock().unwrap();
365+
if *ev_lock {
366+
log_error!(self.logger, "Failed to sync a Channel Monitor, refusing to provide monitor events!");
367+
log_error!(self.logger, " The LDK-based application should now be shutting down!");
368+
return Vec::new();
369+
}
370+
334371
let mut pending_monitor_events = Vec::new();
335372
for monitor in self.monitors.read().unwrap().values() {
336373
pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
@@ -353,6 +390,13 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
353390
///
354391
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
355392
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
393+
let ev_lock = self.event_mutex.lock().unwrap();
394+
if *ev_lock {
395+
log_error!(self.logger, "Failed to sync a Channel Monitor, refusing to provide monitor events!");
396+
log_error!(self.logger, " The LDK-based application should now be shutting down!");
397+
return;
398+
}
399+
356400
let mut pending_events = Vec::new();
357401
for monitor in self.monitors.read().unwrap().values() {
358402
pending_events.append(&mut monitor.get_and_clear_pending_events());

lightning/src/chain/channelmonitor.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,14 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
685685

686686
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
687687

688+
// Note that events MUST NOT be generated during update processing, only generated during chain
689+
// data processing. This prevents a race in ChainMonitor::update_channel (and presumably user
690+
// implementations thereof as well) where we update the in-memory channel object, then before
691+
// the persistence finishes (as its all under a read-lock), we return pending events to the
692+
// user or to the relevant ChannelManager. This could cause duplicate events.
693+
// Note that because the `event_lock` in `ChainMonitor` is only taken in
694+
// block/transaction-connected events and *not* during block/transaction-disconnected events,
695+
// we further MUST NOT generate events during block/transaction-disconnection.
688696
pending_monitor_events: Vec<MonitorEvent>,
689697
pending_events: Vec<Event>,
690698

@@ -2920,6 +2928,27 @@ pub trait Persist<ChannelSigner: Sign> {
29202928
/// [`ChannelMonitorUpdate::write`] for writing out an update, and
29212929
/// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
29222930
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
2931+
2932+
/// Update one channel's data synchronously without a [`ChannelMonitorUpdate`].
2933+
///
2934+
/// This is called during block/transaction connection, and is a good time to synchronously
2935+
/// remove all pending [`ChannelMonitorUpdate`]s which may have been persisted separately as an
2936+
/// intent log.
2937+
///
2938+
/// Note that returning an error here irrevocably disables some processing in [`ChainMonitor`],
2939+
/// preventing continued normal operation. Errors here are largely only useful to continue
2940+
/// operation long enough to shut down.
2941+
///
2942+
/// Failures here do not imply the channel will be force-closed, however any future calls to
2943+
/// [`update_persisted_channel`] after an error is returned here MUST either persist the full,
2944+
/// updated [`ChannelMonitor`] provided to [`update_persisted_channel`] or return
2945+
/// [`ChannelMonitorUpdateErr::PermanentFailure`], force-closing the channel. In other words,
2946+
/// any future calls to [`update_persisted_channel`] after an error here MUST NOT persist the
2947+
/// [`ChannelMonitorUpdate`] alone.
2948+
///
2949+
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
2950+
/// [`update_persisted_channel`]: Persist::update_persisted_channel
2951+
fn sync_persisted_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ()>;
29232952
}
29242953

29252954
impl<Signer: Sign, T: Deref, F: Deref, L: Deref> chain::Listen for (ChannelMonitor<Signer>, T, F, L)

lightning/src/util/test_utils.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ impl<Signer: keysinterface::Sign> channelmonitor::Persist<Signer> for TestPersis
203203
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<Signer>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
204204
self.update_ret.lock().unwrap().clone()
205205
}
206+
fn sync_persisted_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>) -> Result<(), ()> {
207+
self.update_ret.lock().unwrap().clone().map_err(|_| ())
208+
}
206209
}
207210

208211
pub struct TestBroadcaster {

0 commit comments

Comments
 (0)