Skip to content

Commit 0491e66

Browse files
committed
[exporterhelper] Fix potential deadlocks in BatcherSender shutdown
Fixes #10255
1 parent eaab76e commit 0491e66

File tree

3 files changed

+107
-20
lines changed

3 files changed

+107
-20
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix potential deadlocks in BatcherSender shutdown
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10255]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
# Default: '[user]'
20+
change_logs: [user]

exporter/exporterhelper/batch_sender.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,24 @@ type batchSender struct {
4040

4141
logger *zap.Logger
4242

43-
shutdownCh chan struct{}
44-
stopped *atomic.Bool
43+
shutdownCh chan struct{}
44+
shutdownCompleteCh chan struct{}
45+
stopped *atomic.Bool
4546
}
4647

4748
// newBatchSender returns a new batch consumer component.
4849
func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
4950
mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender {
5051
bs := &batchSender{
51-
activeBatch: newEmptyBatch(),
52-
cfg: cfg,
53-
logger: set.Logger,
54-
mergeFunc: mf,
55-
mergeSplitFunc: msf,
56-
shutdownCh: make(chan struct{}),
57-
stopped: &atomic.Bool{},
58-
resetTimerCh: make(chan struct{}),
52+
activeBatch: newEmptyBatch(),
53+
cfg: cfg,
54+
logger: set.Logger,
55+
mergeFunc: mf,
56+
mergeSplitFunc: msf,
57+
shutdownCh: make(chan struct{}),
58+
shutdownCompleteCh: make(chan struct{}),
59+
stopped: &atomic.Bool{},
60+
resetTimerCh: make(chan struct{}),
5961
}
6062
return bs
6163
}
@@ -66,14 +68,19 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
6668
for {
6769
select {
6870
case <-bs.shutdownCh:
69-
bs.mu.Lock()
70-
if bs.activeBatch.request != nil {
71-
bs.exportActiveBatch()
71+
// There is a minimal chance that another request is added after the shutdown signal.
72+
// This loop will handle that case.
73+
for bs.activeRequests.Load() > 0 {
74+
bs.mu.Lock()
75+
if bs.activeBatch.request != nil {
76+
bs.exportActiveBatch()
77+
}
78+
bs.mu.Unlock()
7279
}
73-
bs.mu.Unlock()
7480
if !timer.Stop() {
7581
<-timer.C
7682
}
83+
close(bs.shutdownCompleteCh)
7784
return
7885
case <-timer.C:
7986
bs.mu.Lock()
@@ -118,6 +125,12 @@ func (bs *batchSender) exportActiveBatch() {
118125
bs.activeBatch = newEmptyBatch()
119126
}
120127

128+
func (bs *batchSender) resetTimer() {
129+
if !bs.stopped.Load() {
130+
bs.resetTimerCh <- struct{}{}
131+
}
132+
}
133+
121134
// isActiveBatchReady returns true if the active batch is ready to be exported.
122135
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
123136
// Caller must hold the lock.
@@ -154,7 +167,7 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
154167
batch := bs.activeBatch
155168
if bs.isActiveBatchReady() || len(reqs) > 1 {
156169
bs.exportActiveBatch()
157-
bs.resetTimerCh <- struct{}{}
170+
bs.resetTimer()
158171
}
159172
bs.mu.Unlock()
160173
<-batch.done
@@ -194,7 +207,7 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
194207
batch := bs.activeBatch
195208
if bs.isActiveBatchReady() {
196209
bs.exportActiveBatch()
197-
bs.resetTimerCh <- struct{}{}
210+
bs.resetTimer()
198211
}
199212
bs.mu.Unlock()
200213
<-batch.done
@@ -215,9 +228,6 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) {
215228
func (bs *batchSender) Shutdown(context.Context) error {
216229
bs.stopped.Store(true)
217230
close(bs.shutdownCh)
218-
// Wait for the active requests to finish.
219-
for bs.activeRequests.Load() > 0 {
220-
time.Sleep(10 * time.Millisecond)
221-
}
231+
<-bs.shutdownCompleteCh
222232
return nil
223233
}

exporter/exporterhelper/batch_sender_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,63 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {
439439
}
440440
}
441441

442+
// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being
443+
// merged.
444+
func TestBatchSender_ShutdownDeadlock(t *testing.T) {
445+
blockMerge := make(chan struct{})
446+
waitMerge := make(chan struct{}, 10)
447+
448+
// blockedBatchMergeFunc blocks until the blockMerge channel is closed
449+
blockedBatchMergeFunc := func(_ context.Context, r1 Request, _ Request) (Request, error) {
450+
waitMerge <- struct{}{}
451+
<-blockMerge
452+
return r1, nil
453+
}
454+
455+
bCfg := exporterbatcher.NewDefaultConfig()
456+
bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger
457+
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
458+
WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc)))
459+
require.NoError(t, err)
460+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
461+
462+
sink := newFakeRequestSink()
463+
464+
// Send 10 concurrent requests and wait for them to start
465+
startWG := sync.WaitGroup{}
466+
for i := 0; i < 10; i++ {
467+
startWG.Add(1)
468+
go func() {
469+
startWG.Done()
470+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
471+
}()
472+
}
473+
startWG.Wait()
474+
475+
// Wait for at least one batch to enter the merge function
476+
<-waitMerge
477+
478+
// Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks,
479+
// then wait for the exporter to finish.
480+
startShutdown := make(chan struct{})
481+
doneShutdown := make(chan struct{})
482+
go func() {
483+
close(startShutdown)
484+
require.Nil(t, be.Shutdown(context.Background()))
485+
close(doneShutdown)
486+
}()
487+
<-startShutdown
488+
close(blockMerge)
489+
<-doneShutdown
490+
491+
// The exporter should have sent only one "merged" batch, in some cases it might send two if the shutdown
492+
// happens before the batch is fully merged.
493+
assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load())
494+
495+
// blockedBatchMergeFunc just returns the first request, so the items count should be 4 times the requests count.
496+
assert.Equal(t, sink.requestsCount.Load()*4, sink.itemsCount.Load())
497+
}
498+
442499
func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
443500
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
444501
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))

0 commit comments

Comments
 (0)