Skip to content

Commit 47a699c

Browse files
committed
retry error again
1 parent 4ade9fa commit 47a699c

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

receiver/kafkareceiver/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type Config struct {
8686
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
8787
// The maximum bytes per fetch from Kafka (default "0", no limit)
8888
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
89-
// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
89+
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
9090
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
9191
}
9292

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -552,9 +552,6 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
552552
if !c.autocommitEnabled {
553553
defer session.Commit()
554554
}
555-
if c.backOff != nil {
556-
c.backOff.Reset()
557-
}
558555
for {
559556
select {
560557
case message, ok := <-claim.Messages():
@@ -600,6 +597,11 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
600597
case <-session.Context().Done():
601598
return nil
602599
case <-time.After(backOffDelay):
600+
if !c.messageMarking.After {
601+
// Unmark the message so it can be retried
602+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
603+
}
604+
return err
603605
}
604606
}
605607
}
@@ -627,10 +629,6 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
627629
}
628630
}
629631

630-
func errorRequiresBackoff(err error) bool {
631-
return errors.Is(err, errMemoryLimiterDataRefused)
632-
}
633-
634632
func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
635633
c.readyCloser.Do(func() {
636634
close(c.ready)
@@ -649,9 +647,6 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
649647
if !c.autocommitEnabled {
650648
defer session.Commit()
651649
}
652-
if c.backOff != nil {
653-
c.backOff.Reset()
654-
}
655650
for {
656651
select {
657652
case message, ok := <-claim.Messages():
@@ -697,6 +692,11 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
697692
case <-session.Context().Done():
698693
return nil
699694
case <-time.After(backOffDelay):
695+
if !c.messageMarking.After {
696+
// Unmark the message so it can be retried
697+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
698+
}
699+
return err
700700
}
701701
}
702702
}
@@ -786,6 +786,11 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
786786
case <-session.Context().Done():
787787
return nil
788788
case <-time.After(backOffDelay):
789+
if !c.messageMarking.After {
790+
// Unmark the message so it can be retried
791+
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
792+
}
793+
return err
789794
}
790795
}
791796
}
@@ -866,3 +871,7 @@ func encodingToComponentID(encoding string) (*component.ID, error) {
866871
id := component.NewID(componentType)
867872
return &id, nil
868873
}
874+
875+
func errorRequiresBackoff(err error) bool {
876+
return err.Error() == errMemoryLimiterDataRefused.Error()
877+
}

0 commit comments

Comments
 (0)