diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index f5344076..a22d09ff 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -256,6 +256,11 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + // Note: + // It's crucial to initialize the `sourceAndSequence` variable AFTER `client`. + // This order is important to prevent the accidental triggering of `KafkaConsumerCloseOnTerminate.didTerminate()`. + // If this order is not met and `RDKafkaClient.makeClient()` fails, + // it leads to a call to `stateMachine.messageSequenceTerminated()` while it's still in the `.uninitialized` state. let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerEvent.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index ca8f18ac..b3cfcbdc 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -95,7 +95,7 @@ public final class KafkaProducer: Service, Sendable { stateMachine: NIOLockedValueBox, configuration: KafkaProducerConfiguration, topicConfiguration: KafkaTopicConfiguration - ) throws { + ) { self.stateMachine = stateMachine self.configuration = configuration self.topicConfiguration = topicConfiguration @@ -130,7 +130,7 @@ public final class KafkaProducer: Service, Sendable { ) } - try self.init( + self.init( stateMachine: stateMachine, configuration: configuration, topicConfiguration: configuration.topicConfiguration @@ -156,13 +156,6 @@ public final class KafkaProducer: Service, Sendable { ) throws -> (KafkaProducer, KafkaProducerEvents) { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: KafkaProducerEvent.self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine) - ) - let source = sourceAndSequence.source - let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: configuration.dictionary, @@ -170,16 +163,27 @@ public final class KafkaProducer: Service, Sendable { logger: logger ) - let producer = try KafkaProducer( + let producer = KafkaProducer( stateMachine: stateMachine, configuration: configuration, topicConfiguration: configuration.topicConfiguration ) + // Note: + // It's crucial to initialize the `sourceAndSequence` variable AFTER `client`. + // This order is important to prevent the accidental triggering of `KafkaProducerCloseOnTerminate.didTerminate()`. + // If this order is not met and `RDKafkaClient.makeClient()` fails, + // it leads to a call to `stateMachine.stopConsuming()` while it's still in the `.uninitialized` state. + let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( + elementType: KafkaProducerEvent.self, + backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), + delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine) + ) + stateMachine.withLockedValue { $0.initialize( client: client, - source: source + source: sourceAndSequence.source ) }