diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index bb88ffd2..c5ec5226 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -319,6 +319,8 @@ public final class KafkaConsumer: Sendable, Service { ) } try client.subscribe(topicPartitionList: subscription) + case .consumerClosed: + throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup") } } @@ -339,6 +341,8 @@ public final class KafkaConsumer: Sendable, Service { let assignment = RDKafkaTopicPartitionList() assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue)) try client.assign(topicPartitionList: assignment) + case .consumerClosed: + throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup") } } @@ -723,6 +727,8 @@ extension KafkaConsumer { /// Set up the connection through ``subscribe()`` or ``assign()``. /// - Parameter client: Client used for handling the connection to the Kafka cluster. case setUpConnection(client: RDKafkaClient) + /// The ``KafkaConsumer`` is closed. + case consumerClosed } /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``. @@ -737,8 +743,10 @@ extension KafkaConsumer { return .setUpConnection(client: client) case .running: fatalError("\(#function) should not be invoked more than once") - case .finishing, .finished: + case .finishing: fatalError("\(#function) should only be invoked when KafkaConsumer is running") + case .finished: + return .consumerClosed } } @@ -890,7 +898,7 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") + self.state = .finished case .running(let client, _): self.state = .running(client: client, messagePollLoopState: .finished) case .finishing, .finished: diff --git a/Tests/KafkaTests/KafkaConsumerTests.swift b/Tests/KafkaTests/KafkaConsumerTests.swift index fc3da31a..1137b0d3 100644 --- a/Tests/KafkaTests/KafkaConsumerTests.swift +++ b/Tests/KafkaTests/KafkaConsumerTests.swift @@ -127,4 +127,52 @@ final class KafkaConsumerTests: XCTestCase { let value = try metrics.expectGauge("operations").lastValue XCTAssertNotNil(value) } + + func testConsumerConstructDeinit() async throws { + let uniqueGroupID = UUID().uuidString + let config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]), + bootstrapBrokerAddresses: [] + ) + + _ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run + _ = try KafkaConsumer.makeConsumerWithEvents(configuration: config, logger: .kafkaTest) + } + + func testConsumerMessagesReadCancelledBeforeRun() async throws { + let uniqueGroupID = UUID().uuidString + let config = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]), + bootstrapBrokerAddresses: [] + ) + + let consumer = try KafkaConsumer(configuration: config, logger: .kafkaTest) + + let svcGroupConfig = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: svcGroupConfig) + + // explicitly run and cancel message consuming task before serviceGroup.run() + let consumingTask = Task { + for try await record in consumer.messages { + XCTFail("Unexpected record \(record))") + } + } + + try await Task.sleep(for: .seconds(1)) + + // explicitly cancel message consuming task before serviceGroup.run() + consumingTask.cancel() + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + try await Task.sleep(for: .seconds(1)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } } diff --git a/Tests/KafkaTests/KafkaProducerTests.swift b/Tests/KafkaTests/KafkaProducerTests.swift index de083b97..14b83303 100644 --- a/Tests/KafkaTests/KafkaProducerTests.swift +++ b/Tests/KafkaTests/KafkaProducerTests.swift @@ -377,4 +377,44 @@ final class KafkaProducerTests: XCTestCase { let value = try metrics.expectGauge("operations").lastValue XCTAssertNotNil(value) } + + func testProducerConstructDeinit() async throws { + let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: []) + + _ = try KafkaProducer(configuration: config, logger: .kafkaTest) // deinit called before run + _ = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) // deinit called before run + } + + func testProducerEventsReadCancelledBeforeRun() async throws { + let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: []) + + let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) + + let svcGroupConfig = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: svcGroupConfig) + + // explicitly run and cancel message consuming task before serviceGroup.run() + let producerEventsTask = Task { + for try await event in events { + XCTFail("Unexpected record \(event))") + } + } + + try await Task.sleep(for: .seconds(1)) + + // explicitly cancel message consuming task before serviceGroup.run() + producerEventsTask.cancel() + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + try await Task.sleep(for: .seconds(1)) + + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } }