Skip to content

Commit 58a77db

Browse files
authored
[receiver/kafkareceiver] fix: Kafka receiver blocking shutdown (#35767)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fixes an issue where the Kafka receiver would block on shutdown. There was an earlier fix for this issue [here](#32720). This does solve the issue, but it was only applied to the traces receiver, not the logs or metrics receiver. The issue is this go routine in the `Start()` functions for logs and metrics: ```go go func() { if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` The `consumeLoop()` function returns a `context.Canceled` error when `Shutdown()` is called, which is expected. However `componentstatus.ReportStatus()` blocks while attempting to report this error. The reason/bug for this can be found [here](open-telemetry/opentelemetry-collector#9824). The previously mentioned PR fixed this for the traces receiver by checking if the error returned by `consumeLoop()` is `context.Canceled`: ```go go func() { if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` Additionally, this is `consumeLoop()` for the traces receiver, with the logs and metrics versions being identical: ```go func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return ctx.Err() } } } ``` This does fix the issue, however the only error that can be returned by `consumeLoop()` is a canceled context. When we create the context and cancel function, we use `context.Background()`: ```go ctx, cancel := context.WithCancel(context.Background()) ``` This context is only used by `consumeLoop()` and the cancel function is only called in `Shutdown()`. Because `consumeLoop()` can only return a `context.Canceled` error, this PR removes this unused code for the logs, metrics, and traces receivers. Instead, `consumeLoop()` still logs the `context.Canceled` error but it does not return any error and the go routine simply just calls `consumeLoop()`. Additional motivation for removing the call to `componentstatus.ReportStatus()` is the underlying function called by it, `componentstatus.Report()` says it does not need to be called during `Shutdown()` or `Start()` as the service already does so for the given component, [comment here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25). Even if there wasn't a bug causing this call to block, the component still shouldn't call it since it would only be called during `Shutdown()`. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #30789 <!--Describe what testing was performed and which tests were added.--> #### Testing Tested in a build of the collector with these changes scraping logs from a Kafka instance. When the collector is stopped and `Shutdown()` gets called, the receiver did not block and the collector stopped gracefully as expected.
1 parent 8d9f682 commit 58a77db

File tree

5 files changed

+68
-32
lines changed

5 files changed

+68
-32
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fixes issue causing kafkareceiver to block during Shutdown().
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [30789]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/kafkareceiver/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ require (
1717
github.com/openzipkin/zipkin-go v0.4.3
1818
github.com/stretchr/testify v1.9.0
1919
go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae
20-
go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae
2120
go.opentelemetry.io/collector/config/configtelemetry v0.111.1-0.20241008154146-ea48c09c31ae
2221
go.opentelemetry.io/collector/config/configtls v1.17.1-0.20241008154146-ea48c09c31ae
2322
go.opentelemetry.io/collector/confmap v1.17.1-0.20241008154146-ea48c09c31ae

receiver/kafkareceiver/go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
55

66
import (
77
"context"
8-
"errors"
98
"fmt"
109
"strconv"
1110
"sync"
1211

1312
"github.com/IBM/sarama"
1413
"go.opentelemetry.io/collector/component"
15-
"go.opentelemetry.io/collector/component/componentstatus"
1614
"go.opentelemetry.io/collector/consumer"
1715
"go.opentelemetry.io/collector/pdata/plog"
1816
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -44,6 +42,7 @@ type kafkaTracesConsumer struct {
4442
topics []string
4543
cancelConsumeLoop context.CancelFunc
4644
unmarshaler TracesUnmarshaler
45+
consumeLoopWG *sync.WaitGroup
4746

4847
settings receiver.Settings
4948
telemetryBuilder *metadata.TelemetryBuilder
@@ -65,6 +64,7 @@ type kafkaMetricsConsumer struct {
6564
topics []string
6665
cancelConsumeLoop context.CancelFunc
6766
unmarshaler MetricsUnmarshaler
67+
consumeLoopWG *sync.WaitGroup
6868

6969
settings receiver.Settings
7070
telemetryBuilder *metadata.TelemetryBuilder
@@ -86,6 +86,7 @@ type kafkaLogsConsumer struct {
8686
topics []string
8787
cancelConsumeLoop context.CancelFunc
8888
unmarshaler LogsUnmarshaler
89+
consumeLoopWG *sync.WaitGroup
8990

9091
settings receiver.Settings
9192
telemetryBuilder *metadata.TelemetryBuilder
@@ -113,6 +114,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum
113114
config: config,
114115
topics: []string{config.Topic},
115116
nextConsumer: nextConsumer,
117+
consumeLoopWG: &sync.WaitGroup{},
116118
settings: set,
117119
autocommitEnabled: config.AutoCommit.Enable,
118120
messageMarking: config.MessageMarking,
@@ -207,16 +209,14 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
207209
headers: c.headers,
208210
}
209211
}
210-
go func() {
211-
if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
212-
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
213-
}
214-
}()
212+
c.consumeLoopWG.Add(1)
213+
go c.consumeLoop(ctx, consumerGroup)
215214
<-consumerGroup.ready
216215
return nil
217216
}
218217

219-
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
218+
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
219+
defer c.consumeLoopWG.Done()
220220
for {
221221
// `Consume` should be called inside an infinite loop, when a
222222
// server-side rebalance happens, the consumer session will need to be
@@ -227,7 +227,7 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co
227227
// check if context was cancelled, signaling that the consumer should stop
228228
if ctx.Err() != nil {
229229
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
230-
return ctx.Err()
230+
return
231231
}
232232
}
233233
}
@@ -237,6 +237,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
237237
return nil
238238
}
239239
c.cancelConsumeLoop()
240+
c.consumeLoopWG.Wait()
240241
if c.consumerGroup == nil {
241242
return nil
242243
}
@@ -253,6 +254,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consu
253254
config: config,
254255
topics: []string{config.Topic},
255256
nextConsumer: nextConsumer,
257+
consumeLoopWG: &sync.WaitGroup{},
256258
settings: set,
257259
autocommitEnabled: config.AutoCommit.Enable,
258260
messageMarking: config.MessageMarking,
@@ -315,16 +317,14 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
315317
headers: c.headers,
316318
}
317319
}
318-
go func() {
319-
if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
320-
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
321-
}
322-
}()
320+
c.consumeLoopWG.Add(1)
321+
go c.consumeLoop(ctx, metricsConsumerGroup)
323322
<-metricsConsumerGroup.ready
324323
return nil
325324
}
326325

327-
func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
326+
func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
327+
defer c.consumeLoopWG.Done()
328328
for {
329329
// `Consume` should be called inside an infinite loop, when a
330330
// server-side rebalance happens, the consumer session will need to be
@@ -335,7 +335,7 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C
335335
// check if context was cancelled, signaling that the consumer should stop
336336
if ctx.Err() != nil {
337337
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
338-
return ctx.Err()
338+
return
339339
}
340340
}
341341
}
@@ -345,6 +345,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
345345
return nil
346346
}
347347
c.cancelConsumeLoop()
348+
c.consumeLoopWG.Wait()
348349
if c.consumerGroup == nil {
349350
return nil
350351
}
@@ -361,6 +362,7 @@ func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer
361362
config: config,
362363
topics: []string{config.Topic},
363364
nextConsumer: nextConsumer,
365+
consumeLoopWG: &sync.WaitGroup{},
364366
settings: set,
365367
autocommitEnabled: config.AutoCommit.Enable,
366368
messageMarking: config.MessageMarking,
@@ -426,16 +428,14 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
426428
headers: c.headers,
427429
}
428430
}
429-
go func() {
430-
if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil {
431-
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
432-
}
433-
}()
431+
c.consumeLoopWG.Add(1)
432+
go c.consumeLoop(ctx, logsConsumerGroup)
434433
<-logsConsumerGroup.ready
435434
return nil
436435
}
437436

438-
func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
437+
func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
438+
defer c.consumeLoopWG.Done()
439439
for {
440440
// `Consume` should be called inside an infinite loop, when a
441441
// server-side rebalance happens, the consumer session will need to be
@@ -446,7 +446,7 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons
446446
// check if context was cancelled, signaling that the consumer should stop
447447
if ctx.Err() != nil {
448448
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
449-
return ctx.Err()
449+
return
450450
}
451451
}
452452
}
@@ -456,6 +456,7 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
456456
return nil
457457
}
458458
c.cancelConsumeLoop()
459+
c.consumeLoopWG.Wait()
459460
if c.consumerGroup == nil {
460461
return nil
461462
}

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func TestTracesReceiverStart(t *testing.T) {
9696
c := kafkaTracesConsumer{
9797
config: Config{Encoding: defaultEncoding},
9898
nextConsumer: consumertest.NewNop(),
99+
consumeLoopWG: &sync.WaitGroup{},
99100
settings: receivertest.NewNopSettings(),
100101
consumerGroup: &testConsumerGroup{},
101102
telemetryBuilder: nopTelemetryBuilder(t),
@@ -110,18 +111,19 @@ func TestTracesReceiverStartConsume(t *testing.T) {
110111
require.NoError(t, err)
111112
c := kafkaTracesConsumer{
112113
nextConsumer: consumertest.NewNop(),
114+
consumeLoopWG: &sync.WaitGroup{},
113115
settings: receivertest.NewNopSettings(),
114116
consumerGroup: &testConsumerGroup{},
115117
telemetryBuilder: telemetryBuilder,
116118
}
117119
ctx, cancelFunc := context.WithCancel(context.Background())
118120
c.cancelConsumeLoop = cancelFunc
119121
require.NoError(t, c.Shutdown(context.Background()))
120-
err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{
122+
c.consumeLoopWG.Add(1)
123+
c.consumeLoop(ctx, &tracesConsumerGroupHandler{
121124
ready: make(chan bool),
122125
telemetryBuilder: telemetryBuilder,
123126
})
124-
assert.EqualError(t, err, context.Canceled.Error())
125127
}
126128

127129
func TestTracesReceiver_error(t *testing.T) {
@@ -134,6 +136,7 @@ func TestTracesReceiver_error(t *testing.T) {
134136
c := kafkaTracesConsumer{
135137
config: Config{Encoding: defaultEncoding},
136138
nextConsumer: consumertest.NewNop(),
139+
consumeLoopWG: &sync.WaitGroup{},
137140
settings: settings,
138141
consumerGroup: &testConsumerGroup{err: expectedErr},
139142
telemetryBuilder: nopTelemetryBuilder(t),
@@ -375,6 +378,7 @@ func TestTracesReceiver_encoding_extension(t *testing.T) {
375378
c := kafkaTracesConsumer{
376379
config: Config{Encoding: "traces_encoding"},
377380
nextConsumer: consumertest.NewNop(),
381+
consumeLoopWG: &sync.WaitGroup{},
378382
settings: settings,
379383
consumerGroup: &testConsumerGroup{err: expectedErr},
380384
telemetryBuilder: nopTelemetryBuilder(t),
@@ -449,18 +453,19 @@ func TestMetricsReceiverStartConsume(t *testing.T) {
449453
require.NoError(t, err)
450454
c := kafkaMetricsConsumer{
451455
nextConsumer: consumertest.NewNop(),
456+
consumeLoopWG: &sync.WaitGroup{},
452457
settings: receivertest.NewNopSettings(),
453458
consumerGroup: &testConsumerGroup{},
454459
telemetryBuilder: telemetryBuilder,
455460
}
456461
ctx, cancelFunc := context.WithCancel(context.Background())
457462
c.cancelConsumeLoop = cancelFunc
458463
require.NoError(t, c.Shutdown(context.Background()))
459-
err = c.consumeLoop(ctx, &logsConsumerGroupHandler{
464+
c.consumeLoopWG.Add(1)
465+
c.consumeLoop(ctx, &logsConsumerGroupHandler{
460466
ready: make(chan bool),
461467
telemetryBuilder: telemetryBuilder,
462468
})
463-
assert.EqualError(t, err, context.Canceled.Error())
464469
}
465470

466471
func TestMetricsReceiver_error(t *testing.T) {
@@ -473,6 +478,7 @@ func TestMetricsReceiver_error(t *testing.T) {
473478
c := kafkaMetricsConsumer{
474479
config: Config{Encoding: defaultEncoding},
475480
nextConsumer: consumertest.NewNop(),
481+
consumeLoopWG: &sync.WaitGroup{},
476482
settings: settings,
477483
consumerGroup: &testConsumerGroup{err: expectedErr},
478484
telemetryBuilder: nopTelemetryBuilder(t),
@@ -712,6 +718,7 @@ func TestMetricsReceiver_encoding_extension(t *testing.T) {
712718
c := kafkaMetricsConsumer{
713719
config: Config{Encoding: "metrics_encoding"},
714720
nextConsumer: consumertest.NewNop(),
721+
consumeLoopWG: &sync.WaitGroup{},
715722
settings: settings,
716723
consumerGroup: &testConsumerGroup{err: expectedErr},
717724
telemetryBuilder: nopTelemetryBuilder(t),
@@ -787,6 +794,7 @@ func TestLogsReceiverStart(t *testing.T) {
787794
c := kafkaLogsConsumer{
788795
config: *createDefaultConfig().(*Config),
789796
nextConsumer: consumertest.NewNop(),
797+
consumeLoopWG: &sync.WaitGroup{},
790798
settings: receivertest.NewNopSettings(),
791799
consumerGroup: &testConsumerGroup{},
792800
telemetryBuilder: nopTelemetryBuilder(t),
@@ -801,18 +809,19 @@ func TestLogsReceiverStartConsume(t *testing.T) {
801809
require.NoError(t, err)
802810
c := kafkaLogsConsumer{
803811
nextConsumer: consumertest.NewNop(),
812+
consumeLoopWG: &sync.WaitGroup{},
804813
settings: receivertest.NewNopSettings(),
805814
consumerGroup: &testConsumerGroup{},
806815
telemetryBuilder: telemetryBuilder,
807816
}
808817
ctx, cancelFunc := context.WithCancel(context.Background())
809818
c.cancelConsumeLoop = cancelFunc
810819
require.NoError(t, c.Shutdown(context.Background()))
811-
err = c.consumeLoop(ctx, &logsConsumerGroupHandler{
820+
c.consumeLoopWG.Add(1)
821+
c.consumeLoop(ctx, &logsConsumerGroupHandler{
812822
ready: make(chan bool),
813823
telemetryBuilder: telemetryBuilder,
814824
})
815-
assert.EqualError(t, err, context.Canceled.Error())
816825
}
817826

818827
func TestLogsReceiver_error(t *testing.T) {
@@ -824,6 +833,7 @@ func TestLogsReceiver_error(t *testing.T) {
824833
expectedErr := errors.New("handler error")
825834
c := kafkaLogsConsumer{
826835
nextConsumer: consumertest.NewNop(),
836+
consumeLoopWG: &sync.WaitGroup{},
827837
settings: settings,
828838
consumerGroup: &testConsumerGroup{err: expectedErr},
829839
config: *createDefaultConfig().(*Config),
@@ -1188,6 +1198,7 @@ func TestLogsReceiver_encoding_extension(t *testing.T) {
11881198
c := kafkaLogsConsumer{
11891199
config: Config{Encoding: "logs_encoding"},
11901200
nextConsumer: consumertest.NewNop(),
1201+
consumeLoopWG: &sync.WaitGroup{},
11911202
settings: settings,
11921203
consumerGroup: &testConsumerGroup{err: expectedErr},
11931204
telemetryBuilder: nopTelemetryBuilder(t),

0 commit comments

Comments
 (0)