From 39949eb39eeb528398028230f3c4b21f4136869c Mon Sep 17 00:00:00 2001 From: Guillaume Mallet Date: Tue, 5 Nov 2024 13:24:28 +0000 Subject: [PATCH 1/2] Add headers to KafkaAcknowledgedMessage When a message is being sent to Kafka the acknowledgement should also contains the headers. --- Sources/Kafka/KafkaAcknowledgedMessage.swift | 4 +- Sources/Kafka/KafkaConsumerMessage.swift | 81 +------------------- Sources/Kafka/RDKafka/RDKafkaClient.swift | 76 ++++++++++++++++++ Tests/KafkaTests/KafkaProducerTests.swift | 3 + 4 files changed, 83 insertions(+), 81 deletions(-) diff --git a/Sources/Kafka/KafkaAcknowledgedMessage.swift b/Sources/Kafka/KafkaAcknowledgedMessage.swift index 7c7d958a..7fb56b46 100644 --- a/Sources/Kafka/KafkaAcknowledgedMessage.swift +++ b/Sources/Kafka/KafkaAcknowledgedMessage.swift @@ -27,6 +27,8 @@ public struct KafkaAcknowledgedMessage { public var value: ByteBuffer /// The offset of the message in its partition. public var offset: KafkaOffset + /// The headers of the message. + public var headers: [KafkaHeader] /// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer. /// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages. @@ -53,7 +55,7 @@ public struct KafkaAcknowledgedMessage { self.topic = topic self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition)) - + self.headers = try RDKafkaClient.getHeaders(for: messagePointer) if let keyPointer = rdKafkaMessage.key { let keyBufferPointer = UnsafeRawBufferPointer( start: keyPointer, diff --git a/Sources/Kafka/KafkaConsumerMessage.swift b/Sources/Kafka/KafkaConsumerMessage.swift index a33b1be9..924c7113 100644 --- a/Sources/Kafka/KafkaConsumerMessage.swift +++ b/Sources/Kafka/KafkaConsumerMessage.swift @@ -66,7 +66,7 @@ public struct KafkaConsumerMessage { self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition)) - self.headers = try Self.getHeaders(for: messagePointer) + self.headers = try RDKafkaClient.getHeaders(for: messagePointer) if let keyPointer = rdKafkaMessage.key { let keyBufferPointer = UnsafeRawBufferPointer( @@ -91,82 +91,3 @@ extension KafkaConsumerMessage: Hashable {} // MARK: - KafkaConsumerMessage + Sendable extension KafkaConsumerMessage: Sendable {} - -// MARK: - Helpers - -extension KafkaConsumerMessage { - /// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer. - /// - /// - Parameters: - /// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from. - private static func getHeaders( - for messagePointer: UnsafePointer - ) throws -> [KafkaHeader] { - var result: [KafkaHeader] = [] - var headers: OpaquePointer? - - var readStatus = rd_kafka_message_headers(messagePointer, &headers) - - if readStatus == RD_KAFKA_RESP_ERR__NOENT { - // No Header Entries - return result - } - - guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: readStatus) - } - - guard let headers else { - return result - } - - let headerCount = rd_kafka_header_cnt(headers) - result.reserveCapacity(headerCount) - - var headerIndex = 0 - - while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount { - var headerKeyPointer: UnsafePointer? - var headerValuePointer: UnsafeRawPointer? - var headerValueSize = 0 - - readStatus = rd_kafka_header_get_all( - headers, - headerIndex, - &headerKeyPointer, - &headerValuePointer, - &headerValueSize - ) - - if readStatus == RD_KAFKA_RESP_ERR__NOENT { - // No Header Entries - return result - } - - guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: readStatus) - } - - guard let headerKeyPointer else { - fatalError("Found null pointer when reading KafkaConsumerMessage header key") - } - let headerKey = String(cString: headerKeyPointer) - - var headerValue: ByteBuffer? - if let headerValuePointer, headerValueSize > 0 { - let headerValueBufferPointer = UnsafeRawBufferPointer( - start: headerValuePointer, - count: headerValueSize - ) - headerValue = ByteBuffer(bytes: headerValueBufferPointer) - } - - let newHeader = KafkaHeader(key: headerKey, value: headerValue) - result.append(newHeader) - - headerIndex += 1 - } - - return result - } -} diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 0350576e..465b123b 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -15,6 +15,7 @@ import Crdkafka import Dispatch import Logging +import NIOCore import class Foundation.JSONDecoder @@ -616,4 +617,79 @@ public final class RDKafkaClient: Sendable { func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { try body(self.kafkaHandle.pointer) } + + /// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer. + /// + /// - Parameters: + /// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from. + internal static func getHeaders( + for messagePointer: UnsafePointer + ) throws -> [KafkaHeader] { + var result: [KafkaHeader] = [] + var headers: OpaquePointer? + + var readStatus = rd_kafka_message_headers(messagePointer, &headers) + + if readStatus == RD_KAFKA_RESP_ERR__NOENT { + // No Header Entries + return result + } + + guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: readStatus) + } + + guard let headers else { + return result + } + + let headerCount = rd_kafka_header_cnt(headers) + result.reserveCapacity(headerCount) + + var headerIndex = 0 + + while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount { + var headerKeyPointer: UnsafePointer? + var headerValuePointer: UnsafeRawPointer? + var headerValueSize = 0 + + readStatus = rd_kafka_header_get_all( + headers, + headerIndex, + &headerKeyPointer, + &headerValuePointer, + &headerValueSize + ) + + if readStatus == RD_KAFKA_RESP_ERR__NOENT { + // No Header Entries + return result + } + + guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: readStatus) + } + + guard let headerKeyPointer else { + fatalError("Found null pointer when reading KafkaConsumerMessage header key") + } + let headerKey = String(cString: headerKeyPointer) + + var headerValue: ByteBuffer? + if let headerValuePointer, headerValueSize > 0 { + let headerValueBufferPointer = UnsafeRawBufferPointer( + start: headerValuePointer, + count: headerValueSize + ) + headerValue = ByteBuffer(bytes: headerValueBufferPointer) + } + + let newHeader = KafkaHeader(key: headerKey, value: headerValue) + result.append(newHeader) + + headerIndex += 1 + } + + return result + } } diff --git a/Tests/KafkaTests/KafkaProducerTests.swift b/Tests/KafkaTests/KafkaProducerTests.swift index 16d67152..01c38c40 100644 --- a/Tests/KafkaTests/KafkaProducerTests.swift +++ b/Tests/KafkaTests/KafkaProducerTests.swift @@ -82,8 +82,10 @@ final class KafkaProducerTests: XCTestCase { } let expectedTopic = "test-topic" + let headers = [KafkaHeader(key: "some", value: ByteBuffer.init(string: "test"))] let message = KafkaProducerMessage( topic: expectedTopic, + headers: headers, key: "key", value: "Hello, World!" ) @@ -118,6 +120,7 @@ final class KafkaProducerTests: XCTestCase { XCTAssertEqual(expectedTopic, receivedMessage.topic) XCTAssertEqual(ByteBuffer(string: message.key!), receivedMessage.key) XCTAssertEqual(ByteBuffer(string: message.value), receivedMessage.value) + XCTAssertEqual(headers, receivedMessage.headers) // Shutdown the serviceGroup await serviceGroup.triggerGracefulShutdown() From 7f6ed8c5de7c7cb65688571acb947aa75ae5a74a Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Fri, 15 Nov 2024 10:41:26 +0000 Subject: [PATCH 2/2] Run `swift format` --- Sources/Kafka/RDKafka/RDKafkaClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 465b123b..40f914fe 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -617,7 +617,7 @@ public final class RDKafkaClient: Sendable { func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { try body(self.kafkaHandle.pointer) } - + /// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer. /// /// - Parameters: