diff --git a/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift b/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift index 5919e985..e8457dd3 100644 --- a/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift +++ b/Sources/Kafka/Configuration/KafkaConfiguration+Security.swift @@ -173,7 +173,7 @@ extension KafkaConfiguration { /// Verify the identity of the broker. /// - /// Parameters: + /// - Parameters: /// - trustRoots: File or directory path to CA certificate(s) for verifying the broker's key. /// - certificateRevocationListPath: Path to CRL for verifying broker's certificate validity. public static func verify( diff --git a/Sources/Kafka/KafkaConsumerMessage.swift b/Sources/Kafka/KafkaConsumerMessage.swift index 434330f0..c4a6570a 100644 --- a/Sources/Kafka/KafkaConsumerMessage.swift +++ b/Sources/Kafka/KafkaConsumerMessage.swift @@ -21,6 +21,8 @@ public struct KafkaConsumerMessage { public var topic: String /// The partition that the message was received from. public var partition: KafkaPartition + /// The headers of the message. + public var headers: [KafkaHeader] /// The key of the message. public var key: ByteBuffer? /// The body of the message. @@ -57,6 +59,8 @@ public struct KafkaConsumerMessage { self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition)) + self.headers = try Self.getHeaders(for: messagePointer) + if let keyPointer = rdKafkaMessage.key { let keyBufferPointer = UnsafeRawBufferPointer( start: keyPointer, @@ -80,3 +84,82 @@ extension KafkaConsumerMessage: Hashable {} // MARK: - KafkaConsumerMessage + Sendable extension KafkaConsumerMessage: Sendable {} + +// MARK: - Helpers + +extension KafkaConsumerMessage { + /// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer. + /// + /// - Parameters: + /// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from. + private static func getHeaders( + for messagePointer: UnsafePointer + ) throws -> [KafkaHeader] { + var result: [KafkaHeader] = [] + var headers: OpaquePointer? + + var readStatus = rd_kafka_message_headers(messagePointer, &headers) + + if readStatus == RD_KAFKA_RESP_ERR__NOENT { + // No Header Entries + return result + } + + guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: readStatus) + } + + guard let headers else { + return result + } + + let headerCount = rd_kafka_header_cnt(headers) + result.reserveCapacity(headerCount) + + var headerIndex = 0 + + while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount { + var headerKeyPointer: UnsafePointer? + var headerValuePointer: UnsafeRawPointer? + var headerValueSize = 0 + + readStatus = rd_kafka_header_get_all( + headers, + headerIndex, + &headerKeyPointer, + &headerValuePointer, + &headerValueSize + ) + + if readStatus == RD_KAFKA_RESP_ERR__NOENT { + // No Header Entries + return result + } + + guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: readStatus) + } + + guard let headerKeyPointer else { + fatalError("Found null pointer when reading KafkaConsumerMessage header key") + } + let headerKey = String(cString: headerKeyPointer) + + var headerValue: ByteBuffer? + if let headerValuePointer, headerValueSize > 0 { + let headerValueBufferPointer = UnsafeRawBufferPointer( + start: headerValuePointer, + count: headerValueSize + ) + headerValue = ByteBuffer(bytes: headerValueBufferPointer) + } + + let newHeader = KafkaHeader(key: headerKey, value: headerValue) + result.append(newHeader) + + headerIndex += 1 + } + + return result + } +} diff --git a/Sources/Kafka/KafkaHeader.swift b/Sources/Kafka/KafkaHeader.swift new file mode 100644 index 00000000..6ce20812 --- /dev/null +++ b/Sources/Kafka/KafkaHeader.swift @@ -0,0 +1,38 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +/// A structure representing a header for a Kafka message. +/// Headers are key-value pairs that can be attached to Kafka messages to provide additional metadata. +public struct KafkaHeader: Sendable, Hashable { + /// The key associated with the header. + public var key: String + + /// The value associated with the header. + public var value: ByteBuffer? + + /// Initializes a new Kafka header with the provided key and optional value. + /// + /// - Parameters: + /// - key: The key associated with the header. + /// - value: The optional binary value associated with the header. + public init( + key: String, + value: ByteBuffer? = nil + ) { + self.key = key + self.value = value + } +} diff --git a/Sources/Kafka/KafkaProducerMessage.swift b/Sources/Kafka/KafkaProducerMessage.swift index 838398ff..f9bc1ec2 100644 --- a/Sources/Kafka/KafkaProducerMessage.swift +++ b/Sources/Kafka/KafkaProducerMessage.swift @@ -25,6 +25,9 @@ public struct KafkaProducerMessage, value: UnsafeRawBufferPointer?)] + ) throws -> OpaquePointer? { + let sizeWithoutHeaders = (key != nil) ? 6 : 5 + let size = sizeWithoutHeaders + cHeaders.count + var arguments = Array(repeating: rd_kafka_vu_t(), count: size) + var index = 0 + + arguments[index].vtype = RD_KAFKA_VTYPE_RKT + arguments[index].u.rkt = topicHandle + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_PARTITION + arguments[index].u.i32 = partition + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_MSGFLAGS + arguments[index].u.i = messageFlags + index += 1 + + if let key { + arguments[index].vtype = RD_KAFKA_VTYPE_KEY + arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: key.baseAddress) + arguments[index].u.mem.size = key.count + index += 1 + } + + arguments[index].vtype = RD_KAFKA_VTYPE_VALUE + arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: value.baseAddress) + arguments[index].u.mem.size = value.count + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_OPAQUE + arguments[index].u.ptr = opaque + index += 1 + + for cHeader in cHeaders { + arguments[index].vtype = RD_KAFKA_VTYPE_HEADER + + arguments[index].u.header.name = cHeader.key + arguments[index].u.header.val = cHeader.value?.baseAddress + arguments[index].u.header.size = cHeader.value?.count ?? 0 + + index += 1 + } + + assert(arguments.count == size) + + return rd_kafka_produceva( + self.kafkaHandle, + arguments, + arguments.count + ) + } + + /// Scoped accessor that enables safe access to a ``KafkaProducerMessage``'s key and value raw buffers. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the pointer. + @discardableResult + private static func withMessageKeyAndValueBuffer( + for message: KafkaProducerMessage, + _ body: (UnsafeRawBufferPointer?, UnsafeRawBufferPointer) throws -> T // (keyBuffer, valueBuffer) + ) rethrows -> T { + return try message.value.withUnsafeBytes { valueBuffer in + if let key = message.key { + return try key.withUnsafeBytes { keyBuffer in + return try body(keyBuffer, valueBuffer) + } + } else { + return try body(nil, valueBuffer) + } + } + } + + /// Scoped accessor that enables safe access the underlying memory of an array of ``KafkaHeader``s. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the pointer. + @discardableResult + private static func withKafkaCHeaders( + for headers: [KafkaHeader], + _ body: ([(key: UnsafePointer, value: UnsafeRawBufferPointer?)]) throws -> T + ) rethrows -> T { + var headersMemory: [(key: UnsafePointer, value: UnsafeRawBufferPointer?)] = [] + var headers: [KafkaHeader] = headers.reversed() + return try self._withKafkaCHeadersRecursive(kafkaHeaders: &headers, cHeaders: &headersMemory, body) + } + + /// Recursive helper function that enables safe access the underlying memory of an array of ``KafkaHeader``s. + /// Reads through all `kafkaHeaders` and stores their corresponding pointers in `cHeaders`. + private static func _withKafkaCHeadersRecursive( + kafkaHeaders: inout [KafkaHeader], + cHeaders: inout [(key: UnsafePointer, value: UnsafeRawBufferPointer?)], + _ body: ([(key: UnsafePointer, value: UnsafeRawBufferPointer?)]) throws -> T + ) rethrows -> T { + guard let kafkaHeader = kafkaHeaders.popLast() else { + // Base case: we have read all kafkaHeaders and now invoke the accessor closure + // that can safely access the pointers in cHeaders + return try body(cHeaders) + } + + // Access underlying memory of key and value with scoped accessor and to a + // recursive call to _withKafkaCHeadersRecursive in the scoped accessor. + // This allows us to build a chain of scoped accessors so that the body closure + // can ultimately access all kafkaHeader underlying key/value bytes safely. + return try kafkaHeader.key.withCString { keyCString in + if let headerValue = kafkaHeader.value { + return try headerValue.withUnsafeReadableBytes { valueBuffer in + let cHeader: (UnsafePointer, UnsafeRawBufferPointer?) = (keyCString, valueBuffer) + cHeaders.append(cHeader) + return try self._withKafkaCHeadersRecursive( + kafkaHeaders: &kafkaHeaders, + cHeaders: &cHeaders, + body + ) + } + } else { + let cHeader: (UnsafePointer, UnsafeRawBufferPointer?) = (keyCString, nil) + cHeaders.append(cHeader) + return try self._withKafkaCHeadersRecursive( + kafkaHeaders: &kafkaHeaders, + cHeaders: &cHeaders, + body + ) + } + } + } + /// Swift wrapper for events from `librdkafka`'s event queue. enum KafkaEvent { case deliveryReport(results: [KafkaDeliveryReport]) @@ -425,7 +567,7 @@ final class RDKafkaClient: Sendable { /// Flush any outstanding produce requests. /// - /// Parameters: + /// - Parameters: /// - timeoutMilliseconds: Maximum time to wait for outstanding messages to be flushed. func flush(timeoutMilliseconds: Int32) async throws { // rd_kafka_flush is blocking and there is no convenient way to make it non-blocking. diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 591e339c..5bc233bd 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -272,9 +272,82 @@ final class KafkaTests: XCTestCase { } } + func testProduceAndConsumeWithMessageHeaders() async throws { + let testMessages = Self.createTestMessages( + topic: self.uniqueTestTopic, + headers: [ + KafkaHeader(key: "some.header", value: ByteBuffer(string: "some-header-value")), + KafkaHeader(key: "some.null.header", value: nil), + ], + count: 10 + ) + + let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), + bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] + ) + consumerConfig.isAutoCommitEnabled = false + consumerConfig.autoOffsetReset = .beginning // Always read topics from beginning + consumerConfig.broker.addressFamily = .v4 + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .kafkaTest + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + events: events, + messages: testMessages + ) + } + + // Consumer Task + group.addTask { + var consumedMessages = [KafkaConsumerMessage]() + for try await messageResult in consumer.messages { + guard case let message = messageResult else { + continue + } + consumedMessages.append(message) + try await consumer.commitSync(message) + + if consumedMessages.count >= testMessages.count { + break + } + } + + XCTAssertEqual(testMessages.count, consumedMessages.count) + + for (index, consumedMessage) in consumedMessages.enumerated() { + XCTAssertEqual(testMessages[index].headers, consumedMessage.headers) + } + } + + // Wait for Producer Task and Consumer Task to complete + try await group.next() + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + func testNoNewConsumerMessagesAfterGracefulShutdown() async throws { let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 2) - let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) + let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) let uniqueGroupID = UUID().uuidString @@ -286,7 +359,6 @@ final class KafkaTests: XCTestCase { bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] ) consumerConfig.autoOffsetReset = .beginning // Read topic from beginning - consumerConfig.broker.addressFamily = .v4 let consumer = try KafkaConsumer( configuration: consumerConfig, @@ -306,7 +378,7 @@ final class KafkaTests: XCTestCase { group.addTask { try await Self.sendAndAcknowledgeMessages( producer: producer, - events: acks, + events: events, messages: testMessages ) } @@ -471,10 +543,15 @@ final class KafkaTests: XCTestCase { // MARK: - Helpers - private static func createTestMessages(topic: String, count: UInt) -> [KafkaProducerMessage] { + private static func createTestMessages( + topic: String, + headers: [KafkaHeader] = [], + count: UInt + ) -> [KafkaProducerMessage] { return Array(0..