Skip to content

Kafka Message Header API #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ extension KafkaConfiguration {

/// Verify the identity of the broker.
///
/// Parameters:
/// - Parameters:
/// - trustRoots: File or directory path to CA certificate(s) for verifying the broker's key.
/// - certificateRevocationListPath: Path to CRL for verifying broker's certificate validity.
public static func verify(
Expand Down
83 changes: 83 additions & 0 deletions Sources/Kafka/KafkaConsumerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public struct KafkaConsumerMessage {
public var topic: String
/// The partition that the message was received from.
public var partition: KafkaPartition
/// The headers of the message.
public var headers: [KafkaHeader]
/// The key of the message.
public var key: ByteBuffer?
/// The body of the message.
Expand Down Expand Up @@ -57,6 +59,8 @@ public struct KafkaConsumerMessage {

self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))

self.headers = try Self.getHeaders(for: messagePointer)

if let keyPointer = rdKafkaMessage.key {
let keyBufferPointer = UnsafeRawBufferPointer(
start: keyPointer,
Expand All @@ -80,3 +84,82 @@ extension KafkaConsumerMessage: Hashable {}
// MARK: - KafkaConsumerMessage + Sendable

extension KafkaConsumerMessage: Sendable {}

// MARK: - Helpers

extension KafkaConsumerMessage {
/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
///
/// - Parameters:
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
private static func getHeaders(
for messagePointer: UnsafePointer<rd_kafka_message_t>
) throws -> [KafkaHeader] {
var result: [KafkaHeader] = []
var headers: OpaquePointer?

var readStatus = rd_kafka_message_headers(messagePointer, &headers)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headers else {
return result
}

let headerCount = rd_kafka_header_cnt(headers)
result.reserveCapacity(headerCount)

var headerIndex = 0

while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
var headerKeyPointer: UnsafePointer<CChar>?
var headerValuePointer: UnsafeRawPointer?
var headerValueSize = 0

readStatus = rd_kafka_header_get_all(
headers,
headerIndex,
&headerKeyPointer,
&headerValuePointer,
&headerValueSize
)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headerKeyPointer else {
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
}
let headerKey = String(cString: headerKeyPointer)

var headerValue: ByteBuffer?
if let headerValuePointer, headerValueSize > 0 {
let headerValueBufferPointer = UnsafeRawBufferPointer(
start: headerValuePointer,
count: headerValueSize
)
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
}

let newHeader = KafkaHeader(key: headerKey, value: headerValue)
result.append(newHeader)

headerIndex += 1
}

return result
}
}
38 changes: 38 additions & 0 deletions Sources/Kafka/KafkaHeader.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

/// A structure representing a header for a Kafka message.
/// Headers are key-value pairs that can be attached to Kafka messages to provide additional metadata.
public struct KafkaHeader: Sendable, Hashable {
/// The key associated with the header.
public var key: String

/// The value associated with the header.
public var value: ByteBuffer?

/// Initializes a new Kafka header with the provided key and optional value.
///
/// - Parameters:
/// - key: The key associated with the header.
/// - value: The optional binary value associated with the header.
public init(
key: String,
value: ByteBuffer? = nil
) {
self.key = key
self.value = value
}
}
15 changes: 12 additions & 3 deletions Sources/Kafka/KafkaProducerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public struct KafkaProducerMessage<Key: KafkaContiguousBytes, Value: KafkaContig
/// This means the message will be automatically assigned a partition using the topic's partitioner function.
public var partition: KafkaPartition

/// The headers of the message.
public var headers: [KafkaHeader]

/// The optional key associated with the message.
/// If the ``KafkaPartition`` is ``KafkaPartition/unassigned``, the ``KafkaProducerMessage/key`` is used to ensure
/// that two ``KafkaProducerMessage``s with the same key still get sent to the same ``KafkaPartition``.
Expand All @@ -38,18 +41,21 @@ public struct KafkaProducerMessage<Key: KafkaContiguousBytes, Value: KafkaContig
/// - Parameters:
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partition will be assigned automatically.
/// - headers: The headers of the message.
/// - key: Used to guarantee that messages with the same key will be sent to the same partition so that their order is preserved.
/// - value: The message's value.
public init(
topic: String,
partition: KafkaPartition = .unassigned,
headers: [KafkaHeader] = [],
key: Key,
value: Value
) {
self.topic = topic
self.partition = partition
self.headers = headers
self.key = key
self.value = value
self.partition = partition
}
}

Expand All @@ -59,16 +65,19 @@ extension KafkaProducerMessage where Key == Never {
/// - Parameters:
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partition will be assigned automatically.
/// - headers: The headers of the message.
/// - value: The message body.
public init(
topic: String,
partition: KafkaPartition = .unassigned,
headers: [KafkaHeader] = [],
value: Value
) {
self.topic = topic
self.value = value
self.key = nil
self.partition = partition
self.headers = headers
self.key = nil
self.value = value
}
}

Expand Down
Loading