@@ -2719,6 +2719,10 @@ pub struct ChannelManager<
2719
2719
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2720
2720
pending_events_processor: AtomicBool,
2721
2721
2722
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2723
+ /// [`Self::process_pending_htlc_forwards`].
2724
+ pending_htlc_forwards_processor: AtomicBool,
2725
+
2722
2726
/// If we are running during init (either directly during the deserialization method or in
2723
2727
/// block connection methods which run after deserialization but before normal operation) we
2724
2728
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3795,6 +3799,7 @@ where
3795
3799
3796
3800
pending_events: Mutex::new(VecDeque::new()),
3797
3801
pending_events_processor: AtomicBool::new(false),
3802
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3798
3803
pending_background_events: Mutex::new(Vec::new()),
3799
3804
total_consistency_lock: RwLock::new(()),
3800
3805
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6325,9 +6330,19 @@ where
6325
6330
///
6326
6331
/// Will regularly be called by the background processor.
6327
6332
pub fn process_pending_htlc_forwards(&self) {
6333
+ if self
6334
+ .pending_htlc_forwards_processor
6335
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6336
+ .is_err()
6337
+ {
6338
+ return;
6339
+ }
6340
+
6328
6341
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6329
6342
self.internal_process_pending_htlc_forwards()
6330
6343
});
6344
+
6345
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6331
6346
}
6332
6347
6333
6348
// Returns whether or not we need to re-persist.
@@ -16296,6 +16311,7 @@ where
16296
16311
16297
16312
pending_events: Mutex::new(pending_events_read),
16298
16313
pending_events_processor: AtomicBool::new(false),
16314
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16299
16315
pending_background_events: Mutex::new(pending_background_events),
16300
16316
total_consistency_lock: RwLock::new(()),
16301
16317
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments