From 5ff1fc6154e6170f666db449b3dc4c2e66eb504a Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 1 Sep 2023 10:48:32 +0100 Subject: [PATCH] `storeMessageOffset`: ignore state error Motivation: Previously, we failed the entire `KafkaConsumer` if storing a message offset through `RDKafkaClient.storeMessageOffset` failed because the partition the offset should be committed to was unassigned (which can happen during rebalance). We should not fail the consumer when committing during rebalance. The worst thing that could happen here is that storing the offset fails and we re-read a message, which is fine since KafkaConsumers with automatic commits are designed for at-least-once processing: https://docs.confluent.io/platform/current/clients/consumer.html#offset-management Modifications: * `RDKafkaClient.storeMessageOffset`: don't throw when receiving error `RD_KAFKA_RESP_ERR__STATE` --- Sources/Kafka/RDKafka/RDKafkaClient.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 9d69e08d..42686937 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -520,6 +520,14 @@ final class RDKafkaClient: Sendable { } if error != RD_KAFKA_RESP_ERR_NO_ERROR { + // Ignore RD_KAFKA_RESP_ERR__STATE error. + // RD_KAFKA_RESP_ERR__STATE indicates an attempt to commit to an unassigned partition, + // which can occur during rebalancing or when the consumer is shutting down. + // See "Upgrade considerations" for more details: https://github.com/confluentinc/librdkafka/releases/tag/v1.9.0 + // Since Kafka Consumers are designed for at-least-once processing, failing to commit here is acceptable. + if error != RD_KAFKA_RESP_ERR__STATE { + return + } throw KafkaError.rdKafkaError(wrapping: error) } }