@@ -2720,6 +2720,10 @@ pub struct ChannelManager<
2720
2720
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2721
2721
pending_events_processor: AtomicBool,
2722
2722
2723
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2724
+ /// [`Self::process_pending_htlc_forwards`].
2725
+ pending_htlc_forwards_processor: AtomicBool,
2726
+
2723
2727
/// If we are running during init (either directly during the deserialization method or in
2724
2728
/// block connection methods which run after deserialization but before normal operation) we
2725
2729
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3796,6 +3800,7 @@ where
3796
3800
3797
3801
pending_events: Mutex::new(VecDeque::new()),
3798
3802
pending_events_processor: AtomicBool::new(false),
3803
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3799
3804
pending_background_events: Mutex::new(Vec::new()),
3800
3805
total_consistency_lock: RwLock::new(()),
3801
3806
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6338,9 +6343,19 @@ where
6338
6343
/// Users implementing their own background processing logic should call this in irregular,
6339
6344
/// randomly-distributed intervals.
6340
6345
pub fn process_pending_htlc_forwards(&self) {
6346
+ if self
6347
+ .pending_htlc_forwards_processor
6348
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6349
+ .is_err()
6350
+ {
6351
+ return;
6352
+ }
6353
+
6341
6354
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6342
6355
self.internal_process_pending_htlc_forwards()
6343
6356
});
6357
+
6358
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6344
6359
}
6345
6360
6346
6361
// Returns whether or not we need to re-persist.
@@ -16474,6 +16489,7 @@ where
16474
16489
16475
16490
pending_events: Mutex::new(pending_events_read),
16476
16491
pending_events_processor: AtomicBool::new(false),
16492
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16477
16493
pending_background_events: Mutex::new(pending_background_events),
16478
16494
total_consistency_lock: RwLock::new(()),
16479
16495
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments