Skip to content

Poll for messages using TaskExecutor #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
private let stateMachineHolder: MachineHolder
let pollInterval: Duration
#if swift(>=6.0)
private let queue: DispatchQueueTaskExecutor
#endif

private final class MachineHolder: Sendable { // only for deinit
let stateMachine: LockedMachine
Expand All @@ -88,21 +91,35 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
init(stateMachine: LockedMachine, pollInterval: Duration) {
self.stateMachineHolder = .init(stateMachine: stateMachine)
self.pollInterval = pollInterval
#if swift(>=6.0)
self.queue = DispatchQueueTaskExecutor(
DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")
)
#endif
}

public func next() async throws -> Element? {
// swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165
// Currently use Task.sleep() if no new messages, should use task executor preference when implemented:
// https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md
while !Task.isCancelled {
let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() }

switch action {
case .poll(let client):
if let message = try client.consumerPoll() { // non-blocking call
// Attempt to fetch a message synchronously. Bail
// immediately if no message is waiting for us.
if let message = try client.consumerPoll() {
return message
}

#if swift(>=6.0)
// Wait on a separate thread for the next message.
// The call below will block for `pollInterval`.
return try await withTaskExecutorPreference(queue) {
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens after the time out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We attempt to retrieve a message for self.pollInterval. If there's still no message, we return nil—the same behavior as in the above if let. I'd expect we get caught up in the while-loop on line 100 until we do receive a message eventually.

}
#else
// No messages. Sleep a little.
try await Task.sleep(for: self.pollInterval)
#endif
case .suspendPollLoop:
try await Task.sleep(for: self.pollInterval) // not started yet
case .terminatePollLoop:
Expand Down
4 changes: 2 additions & 2 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ public final class RDKafkaClient: Sendable {
///
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
func consumerPoll() throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else {
// No error, there might be no more messages
return nil
}
Expand Down
39 changes: 39 additions & 0 deletions Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if swift(>=6.0)
import Dispatch

final class DispatchQueueTaskExecutor: TaskExecutor {
let queue: DispatchQueue

init(_ queue: DispatchQueue) {
self.queue = queue
}

public func enqueue(_ _job: consuming ExecutorJob) {
let job = UnownedJob(_job)
queue.async {
job.runSynchronously(
on: self.asUnownedTaskExecutor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're able to use the

public func runSynchronously(isolatedTo serialExecutor: UnownedSerialExecutor,
                             taskExecutor: UnownedTaskExecutor) {

here and pass the queue's UnownedSerialExecutor that would be preferable as it would AFAIR be more correct in tracking where this is isolated to.

If this is a pain because the queue does not conform to SerialExecutor on some platforms still... then perhaps conditionalize it to platforms where it is? Or leave as is and let me know and we'll chase fixing the conformance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you. Since we own the underlying queue here is owned by us nothing should be isolated to it.

)
}
}

@inlinable
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}
}
#endif