From 50f3d1526914da1d669c9f305bfaa09ee1577e13 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Fri, 18 Aug 2023 17:12:46 +0100 Subject: [PATCH 1/2] `KafkaProducer.makeeProducerWithEvents()` avoid `fatalError` Motivation: `KafkaProducer.makeeProducerWithEvents()`: initializing the `NIOAsyncSequenceProducer Modifications: * make designated initializer of `KafkaProducer` non-throwing * `KafkaProducer.makeProducerWithEvents`: * initialize `client` before calling `NIOAsyncSequenceProducer.makeSequence` -> if initializing `client` fails the appropiate error gets thrown instead of triggering a fatalError because `KafkaProducerCloseOnTerminate` gets triggered in an invalid state Result: When the initialization `KafkaProducer.makeProducerWithEvents` fails an error is thrown instead of triggering the `fatalError` in `KafkaProducerCloseOnTerminate`. --- Sources/Kafka/KafkaProducer.swift | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index ca8f18ac..b3cfcbdc 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -95,7 +95,7 @@ public final class KafkaProducer: Service, Sendable { stateMachine: NIOLockedValueBox, configuration: KafkaProducerConfiguration, topicConfiguration: KafkaTopicConfiguration - ) throws { + ) { self.stateMachine = stateMachine self.configuration = configuration self.topicConfiguration = topicConfiguration @@ -130,7 +130,7 @@ public final class KafkaProducer: Service, Sendable { ) } - try self.init( + self.init( stateMachine: stateMachine, configuration: configuration, topicConfiguration: configuration.topicConfiguration @@ -156,13 +156,6 @@ public final class KafkaProducer: Service, Sendable { ) throws -> (KafkaProducer, KafkaProducerEvents) { let stateMachine = NIOLockedValueBox(StateMachine(logger: logger)) - let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( - elementType: KafkaProducerEvent.self, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), - delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine) - ) - let source = sourceAndSequence.source - let client = try RDKafkaClient.makeClient( type: .producer, configDictionary: configuration.dictionary, @@ -170,16 +163,27 @@ public final class KafkaProducer: Service, Sendable { logger: logger ) - let producer = try KafkaProducer( + let producer = KafkaProducer( stateMachine: stateMachine, configuration: configuration, topicConfiguration: configuration.topicConfiguration ) + // Note: + // It's crucial to initialize the `sourceAndSequence` variable AFTER `client`. + // This order is important to prevent the accidental triggering of `KafkaProducerCloseOnTerminate.didTerminate()`. + // If this order is not met and `RDKafkaClient.makeClient()` fails, + // it leads to a call to `stateMachine.stopConsuming()` while it's still in the `.uninitialized` state. + let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( + elementType: KafkaProducerEvent.self, + backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(), + delegate: KafkaProducerCloseOnTerminate(stateMachine: stateMachine) + ) + stateMachine.withLockedValue { $0.initialize( client: client, - source: source + source: sourceAndSequence.source ) } From 35a77e4f427580336a214c31502b1703a21dbaef Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Mon, 21 Aug 2023 11:03:51 +0100 Subject: [PATCH 2/2] Add order comment to KafkaConsumer.makeConsumerWithEvents --- Sources/Kafka/KafkaConsumer.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index f5344076..a22d09ff 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -256,6 +256,11 @@ public final class KafkaConsumer: Sendable, Service { logger: logger ) + // Note: + // It's crucial to initialize the `sourceAndSequence` variable AFTER `client`. + // This order is important to prevent the accidental triggering of `KafkaConsumerCloseOnTerminate.didTerminate()`. + // If this order is not met and `RDKafkaClient.makeClient()` fails, + // it leads to a call to `stateMachine.messageSequenceTerminated()` while it's still in the `.uninitialized` state. let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence( elementType: KafkaConsumerEvent.self, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),