Skip to content

Commit f9bf999

Browse files
committed
Move ChannelManager::monitor_updated to a MonitorEvent
In the next commit we'll need ChainMonitor to "see" when a monitor persistence completes, which means `monitor_updated` needs to move to `ChainMonitor`. The simplest way to then communicate that information to `ChannelManager` is via `MonitorEvet`s, which seems to line up ok, even if they're now constructed by multiple different places.
1 parent 8df37a9 commit f9bf999

File tree

4 files changed

+83
-49
lines changed

4 files changed

+83
-49
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
2929
use chain;
3030
use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
3131
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32-
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
32+
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, MonitorUpdated, TransactionOutputs};
3333
use chain::transaction::{OutPoint, TransactionData};
3434
use chain::keysinterface::Sign;
3535
use util::logger::Logger;
@@ -38,7 +38,7 @@ use util::events::EventHandler;
3838
use ln::channelmanager::ChannelDetails;
3939

4040
use prelude::*;
41-
use sync::{RwLock, RwLockReadGuard};
41+
use sync::{RwLock, RwLockReadGuard, Mutex};
4242
use core::ops::Deref;
4343

4444
/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -134,6 +134,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
134134
logger: L,
135135
fee_estimator: F,
136136
persister: P,
137+
user_provided_events: Mutex<Vec<MonitorEvent>>,
137138
}
138139

139140
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -207,6 +208,7 @@ where C::Target: chain::Filter,
207208
logger,
208209
fee_estimator: feeest,
209210
persister,
211+
user_provided_events: Mutex::new(Vec::new()),
210212
}
211213
}
212214

@@ -257,6 +259,29 @@ where C::Target: chain::Filter,
257259
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
258260
}
259261

262+
/// Indicates the persistence of a [`ChannelMonitor`] has completed after
263+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
264+
///
265+
/// All ChannelMonitor updates up to and including highest_applied_update_id must have been
266+
/// fully committed in every copy of the given channels' ChannelMonitors.
267+
///
268+
/// Note that there is no effect to calling with a highest_applied_update_id other than the
269+
/// current latest ChannelMonitorUpdate and one call to this function after multiple
270+
/// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
271+
/// exists largely only to prevent races between this and concurrent update_monitor calls.
272+
///
273+
/// Thus, the anticipated use is, at a high level:
274+
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
275+
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
276+
/// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
277+
/// 2) once all remote copies are updated, you call this function with the update_id that
278+
/// completed, and once it is the latest the Channel will be re-enabled.
279+
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) {
280+
self.user_provided_events.lock().unwrap().push(MonitorEvent::UpdateCompleted(MonitorUpdated {
281+
funding_txo, monitor_update_id: highest_applied_update_id
282+
}));
283+
}
284+
260285
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
261286
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
262287
use util::events::EventsProvider;
@@ -426,7 +451,7 @@ where C::Target: chain::Filter,
426451
}
427452

428453
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
429-
let mut pending_monitor_events = Vec::new();
454+
let mut pending_monitor_events = self.user_provided_events.lock().unwrap().split_off(0);
430455
for monitor_state in self.monitors.read().unwrap().values() {
431456
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
432457
}

lightning/src/chain/channelmonitor.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,21 @@ impl Readable for ChannelMonitorUpdate {
123123
#[derive(Clone, Debug)]
124124
pub struct MonitorUpdateError(pub &'static str);
125125

126+
/// A [`MonitorEvent`] which indicates a [`ChannelMonitor`] update has completed. See
127+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used.
128+
#[derive(Clone, Debug, PartialEq)]
129+
pub struct MonitorUpdated {
130+
/// The funding oupoint of the [`ChannelMonitor`] which was updated
131+
pub funding_txo: OutPoint,
132+
/// The Update ID of the update which was applied from [`ChannelMonitorUpdate::update_id`] or
133+
/// [`ChannelMonitor::get_latest_update_id`].
134+
pub monitor_update_id: u64,
135+
}
136+
impl_writeable_tlv_based!(MonitorUpdated, {
137+
(0, funding_txo, required),
138+
(2, monitor_update_id, required),
139+
});
140+
126141
/// An event to be processed by the ChannelManager.
127142
#[derive(Clone, PartialEq)]
128143
pub enum MonitorEvent {
@@ -131,9 +146,15 @@ pub enum MonitorEvent {
131146

132147
/// A monitor event that the Channel's commitment transaction was confirmed.
133148
CommitmentTxConfirmed(OutPoint),
149+
150+
/// Indicates a [`ChannelMonitor`] update has completed. See
151+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used.
152+
UpdateCompleted(MonitorUpdated),
134153
}
135154
impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ;
136155
(0, HTLCEvent),
156+
// Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor
157+
(1, UpdateCompleted),
137158
(2, CommitmentTxConfirmed),
138159
);
139160

@@ -854,14 +875,19 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
854875
writer.write_all(&payment_preimage.0[..])?;
855876
}
856877

857-
writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
878+
writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev {
879+
MonitorEvent::HTLCEvent(_) => true,
880+
MonitorEvent::CommitmentTxConfirmed(_) => true,
881+
_ => false,
882+
}).count() as u64).to_be_bytes())?;
858883
for event in self.pending_monitor_events.iter() {
859884
match event {
860885
MonitorEvent::HTLCEvent(upd) => {
861886
0u8.write(writer)?;
862887
upd.write(writer)?;
863888
},
864-
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?
889+
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?,
890+
_ => {}, // Covered in the TLV writes below
865891
}
866892
}
867893

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) {
185185

186186
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
187187
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
188-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
188+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
189189
check_added_monitors!(nodes[0], 0);
190190

191191
let mut events_2 = nodes[0].node.get_and_clear_pending_msg_events();
@@ -342,7 +342,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
342342
// Now fix monitor updating...
343343
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
344344
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
345-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
345+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
346346
check_added_monitors!(nodes[0], 0);
347347

348348
macro_rules! disconnect_reconnect_peers { () => { {
@@ -642,7 +642,7 @@ fn test_monitor_update_fail_cs() {
642642

643643
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
644644
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
645-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
645+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
646646
check_added_monitors!(nodes[1], 0);
647647
let responses = nodes[1].node.get_and_clear_pending_msg_events();
648648
assert_eq!(responses.len(), 2);
@@ -676,7 +676,7 @@ fn test_monitor_update_fail_cs() {
676676

677677
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
678678
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
679-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
679+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
680680
check_added_monitors!(nodes[0], 0);
681681

682682
let final_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
@@ -739,7 +739,7 @@ fn test_monitor_update_fail_no_rebroadcast() {
739739

740740
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
741741
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
742-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
742+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
743743
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
744744
check_added_monitors!(nodes[1], 0);
745745
expect_pending_htlcs_forwardable!(nodes[1]);
@@ -805,7 +805,7 @@ fn test_monitor_update_raa_while_paused() {
805805

806806
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
807807
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
808-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
808+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
809809
check_added_monitors!(nodes[0], 0);
810810

811811
let as_update_raa = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
@@ -938,7 +938,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
938938
// update_add update.
939939
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
940940
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone();
941-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
941+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
942942
check_added_monitors!(nodes[1], 0);
943943
expect_pending_htlcs_forwardable!(nodes[1]);
944944
check_added_monitors!(nodes[1], 1);
@@ -1177,7 +1177,7 @@ fn test_monitor_update_fail_reestablish() {
11771177

11781178
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
11791179
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
1180-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1180+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
11811181
check_added_monitors!(nodes[1], 0);
11821182

11831183
updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1263,7 +1263,7 @@ fn raa_no_response_awaiting_raa_state() {
12631263

12641264
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
12651265
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1266-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1266+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
12671267
// nodes[1] should be AwaitingRAA here!
12681268
check_added_monitors!(nodes[1], 0);
12691269
let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1385,7 +1385,7 @@ fn claim_while_disconnected_monitor_update_fail() {
13851385
// receiving the commitment update from A, and the resulting commitment dances.
13861386
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
13871387
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1388-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1388+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
13891389
check_added_monitors!(nodes[1], 0);
13901390

13911391
let bs_msgs = nodes[1].node.get_and_clear_pending_msg_events();
@@ -1496,7 +1496,7 @@ fn monitor_failed_no_reestablish_response() {
14961496

14971497
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
14981498
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1499-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1499+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
15001500
check_added_monitors!(nodes[1], 0);
15011501
let bs_responses = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
15021502

@@ -1594,7 +1594,7 @@ fn first_message_on_recv_ordering() {
15941594

15951595
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
15961596
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1597-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1597+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
15981598
check_added_monitors!(nodes[1], 0);
15991599

16001600
expect_pending_htlcs_forwardable!(nodes[1]);
@@ -1682,7 +1682,7 @@ fn test_monitor_update_fail_claim() {
16821682

16831683
// Now restore monitor updating on the 0<->1 channel and claim the funds on B.
16841684
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
1685-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1685+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
16861686
check_added_monitors!(nodes[1], 0);
16871687

16881688
let bs_fulfill_update = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1780,7 +1780,7 @@ fn test_monitor_update_on_pending_forwards() {
17801780

17811781
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
17821782
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_1.2).unwrap().clone();
1783-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1783+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
17841784
check_added_monitors!(nodes[1], 0);
17851785

17861786
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
@@ -1844,7 +1844,7 @@ fn monitor_update_claim_fail_no_response() {
18441844

18451845
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
18461846
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1847-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1847+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
18481848
check_added_monitors!(nodes[1], 0);
18491849
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
18501850

@@ -1902,7 +1902,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
19021902
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
19031903
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
19041904
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1905-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
1905+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
19061906
check_added_monitors!(nodes[0], 0);
19071907

19081908
let events = nodes[0].node.get_and_clear_pending_events();
@@ -1934,7 +1934,7 @@ fn do_during_funding_monitor_fail(confirm_a_first: bool, restore_b_before_conf:
19341934

19351935
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
19361936
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
1937-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
1937+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
19381938
check_added_monitors!(nodes[1], 0);
19391939

19401940
let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
@@ -2024,7 +2024,7 @@ fn test_path_paused_mpp() {
20242024
// And check that, after we successfully update the monitor for chan_2 we can pass the second
20252025
// HTLC along to nodes[3] and claim the whole payment back to nodes[0].
20262026
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2_id).unwrap().clone();
2027-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
2027+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
20282028
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
20292029
assert_eq!(events.len(), 1);
20302030
pass_along_path(&nodes[0], &[&nodes[2], &nodes[3]], 200_000, payment_hash.clone(), Some(payment_secret), events.pop().unwrap(), true, None);
@@ -2367,7 +2367,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
23672367
// not occur prior to #756).
23682368
chanmon_cfgs[0].persister.set_update_ret(Ok(()));
23692369
let (funding_txo, mon_id) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id).unwrap().clone();
2370-
nodes[0].node.channel_monitor_updated(&funding_txo, mon_id);
2370+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, mon_id);
23712371

23722372
// New outbound messages should be generated immediately upon a call to
23732373
// get_and_clear_pending_msg_events (but not before).
@@ -2567,14 +2567,14 @@ fn test_temporary_error_during_shutdown() {
25672567
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
25682568

25692569
let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
2570-
nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
2570+
nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
25712571
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()));
25722572

25732573
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
25742574

25752575
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
25762576
let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
2577-
nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
2577+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(outpoint, latest_update);
25782578

25792579
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id()));
25802580
let (_, closing_signed_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
@@ -2663,10 +2663,10 @@ fn double_temp_error() {
26632663
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
26642664

26652665
let (_, latest_update_2) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
2666-
nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_1);
2666+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_1);
26672667
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
26682668
check_added_monitors!(nodes[1], 0);
2669-
nodes[1].node.channel_monitor_updated(&funding_tx, latest_update_2);
2669+
nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(funding_tx, latest_update_2);
26702670

26712671
// Complete the first HTLC.
26722672
let events = nodes[1].node.get_and_clear_pending_msg_events();

0 commit comments

Comments
 (0)