Skip to content

Lower requirements for consumer state machine #154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ public final class KafkaConsumer: Sendable, Service {
)
}
try client.subscribe(topicPartitionList: subscription)
case .consumerClosed:
throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup")
}
}

Expand All @@ -339,6 +341,8 @@ public final class KafkaConsumer: Sendable, Service {
let assignment = RDKafkaTopicPartitionList()
assignment.setOffset(topic: topic, partition: partition, offset: Int64(offset.rawValue))
try client.assign(topicPartitionList: assignment)
case .consumerClosed:
throw KafkaError.connectionClosed(reason: "Consumer deinitialized before setup")
}
}

Expand Down Expand Up @@ -723,6 +727,8 @@ extension KafkaConsumer {
/// Set up the connection through ``subscribe()`` or ``assign()``.
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case setUpConnection(client: RDKafkaClient)
/// The ``KafkaConsumer`` is closed.
case consumerClosed
}

/// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
Expand All @@ -737,8 +743,10 @@ extension KafkaConsumer {
return .setUpConnection(client: client)
case .running:
fatalError("\(#function) should not be invoked more than once")
case .finishing, .finished:
case .finishing:
fatalError("\(#function) should only be invoked when KafkaConsumer is running")
case .finished:
return .consumerClosed
}
}

Expand Down Expand Up @@ -890,7 +898,7 @@ extension KafkaConsumer {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
self.state = .finished
case .running(let client, _):
self.state = .running(client: client, messagePollLoopState: .finished)
case .finishing, .finished:
Expand Down
48 changes: 48 additions & 0 deletions Tests/KafkaTests/KafkaConsumerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,52 @@ final class KafkaConsumerTests: XCTestCase {
let value = try metrics.expectGauge("operations").lastValue
XCTAssertNotNil(value)
}

func testConsumerConstructDeinit() async throws {
let uniqueGroupID = UUID().uuidString
let config = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
bootstrapBrokerAddresses: []
)

_ = try KafkaConsumer(configuration: config, logger: .kafkaTest) // deinit called before run
_ = try KafkaConsumer.makeConsumerWithEvents(configuration: config, logger: .kafkaTest)
}

func testConsumerMessagesReadCancelledBeforeRun() async throws {
let uniqueGroupID = UUID().uuidString
let config = KafkaConsumerConfiguration(
consumptionStrategy: .group(id: uniqueGroupID, topics: ["this-topic-does-not-exist"]),
bootstrapBrokerAddresses: []
)

let consumer = try KafkaConsumer(configuration: config, logger: .kafkaTest)

let svcGroupConfig = ServiceGroupConfiguration(services: [consumer], logger: .kafkaTest)
let serviceGroup = ServiceGroup(configuration: svcGroupConfig)

// explicitly run and cancel message consuming task before serviceGroup.run()
let consumingTask = Task {
for try await record in consumer.messages {
XCTFail("Unexpected record \(record))")
}
}

try await Task.sleep(for: .seconds(1))

// explicitly cancel message consuming task before serviceGroup.run()
consumingTask.cancel()

try await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
try await serviceGroup.run()
}

try await Task.sleep(for: .seconds(1))

// Shutdown the serviceGroup
await serviceGroup.triggerGracefulShutdown()
}
}
}
40 changes: 40 additions & 0 deletions Tests/KafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -377,4 +377,44 @@ final class KafkaProducerTests: XCTestCase {
let value = try metrics.expectGauge("operations").lastValue
XCTAssertNotNil(value)
}

func testProducerConstructDeinit() async throws {
let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])

_ = try KafkaProducer(configuration: config, logger: .kafkaTest) // deinit called before run
_ = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest) // deinit called before run
}

func testProducerEventsReadCancelledBeforeRun() async throws {
let config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: config, logger: .kafkaTest)

let svcGroupConfig = ServiceGroupConfiguration(services: [producer], logger: .kafkaTest)
let serviceGroup = ServiceGroup(configuration: svcGroupConfig)

// explicitly run and cancel message consuming task before serviceGroup.run()
let producerEventsTask = Task {
for try await event in events {
XCTFail("Unexpected record \(event))")
}
}

try await Task.sleep(for: .seconds(1))

// explicitly cancel message consuming task before serviceGroup.run()
producerEventsTask.cancel()

try await withThrowingTaskGroup(of: Void.self) { group in
// Run Task
group.addTask {
try await serviceGroup.run()
}

try await Task.sleep(for: .seconds(1))

// Shutdown the serviceGroup
await serviceGroup.triggerGracefulShutdown()
}
}
}