From 9dfa08379e9089cf5b2aa691d90938eb5007a232 Mon Sep 17 00:00:00 2001 From: Felix Schlegel Date: Wed, 20 Sep 2023 16:44:24 +0100 Subject: [PATCH] Consumer: `assign` / `subscribe` on `run()` Motivation: We should start assignments / subscriptions when `KafkaConsumer.run()` is invoked and not on initialization. Modifications: * `KafkaConsumer`: move `assign` / `subscribe` from `init` to `func run()` --- Sources/Kafka/KafkaConsumer.swift | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 12fe0dca..8e4529ce 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -181,13 +181,6 @@ public final class KafkaConsumer: Sendable, Service { // Forward main queue events to the consumer queue. try client.pollSetConsumer() - - switch configuration.consumptionStrategy._internal { - case .partition(topic: let topic, partition: let partition, offset: let offset): - try self.assign(topic: topic, partition: partition, offset: offset) - case .group(groupID: _, topics: let topics): - try self.subscribe(topics: topics) - } } /// Initialize a new ``KafkaConsumer``. @@ -331,6 +324,13 @@ public final class KafkaConsumer: Sendable, Service { } private func _run() async throws { + switch self.configuration.consumptionStrategy._internal { + case .partition(topic: let topic, partition: let partition, offset: let offset): + try self.assign(topic: topic, partition: partition, offset: offset) + case .group(groupID: _, topics: let topics): + try self.subscribe(topics: topics) + } + while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction {