Skip to content

Commit 48515b3

Browse files
committed
Review Franz
Modifications: * no copying of `KafkaProducerMessage` headers and values -> build scoped accessor helper that recursively accesses all underlying pointers of the `KafkaProducerMessage`'s `headers: [KafkaHeader]` * only use `rd_kafka_produceva` when `message.headers.isEmpty == false`
1 parent 2ad5481 commit 48515b3

File tree

2 files changed

+170
-148
lines changed

2 files changed

+170
-148
lines changed

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 170 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -124,59 +124,193 @@ final class RDKafkaClient: Sendable {
124124
"Partition ID outside of valid range \(0...Int32.max)"
125125
)
126126

127-
let errorPointer = try message.value.withUnsafeBytes { valueBuffer in
128-
try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfiguration: topicConfiguration) { topicHandle in
129-
if let key = message.key {
130-
// Key available, we can use scoped accessor to safely access its rawBufferPointer.
131-
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
132-
// Returns 0 on success, error code otherwise.
133-
return key.withUnsafeBytes { keyBuffer in
134-
135-
let unsafeMessage = RDKafkaUnsafeProducerMessage(
127+
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
128+
// Returns 0 on success, error code otherwise.
129+
let error = try topicHandles.withTopicHandlePointer(
130+
topic: message.topic,
131+
topicConfiguration: topicConfiguration
132+
) { topicHandle in
133+
return try Self.withMessageKeyAndValueBuffer(for: message) { keyBuffer, valueBuffer in
134+
if message.headers.isEmpty {
135+
// No message headers set, normal produce method can be used.
136+
rd_kafka_produce(
137+
topicHandle,
138+
Int32(message.partition.rawValue),
139+
RD_KAFKA_MSG_F_COPY,
140+
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
141+
valueBuffer.count,
142+
keyBuffer?.baseAddress,
143+
keyBuffer?.count ?? 0,
144+
UnsafeMutableRawPointer(bitPattern: newMessageID)
145+
)
146+
return rd_kafka_last_error()
147+
} else {
148+
let errorPointer = try Self.withKafkaCHeaders(for: message.headers) { cHeaders in
149+
// Setting message headers only works with `rd_kafka_produceva` (variadic arguments).
150+
try self._produceVariadic(
136151
topicHandle: topicHandle,
137152
partition: Int32(message.partition.rawValue),
138153
messageFlags: RD_KAFKA_MSG_F_COPY,
139154
key: keyBuffer,
140155
value: valueBuffer,
141156
opaque: UnsafeMutableRawPointer(bitPattern: newMessageID),
142-
headers: message.headers
143-
)
144-
145-
return rd_kafka_produceva(
146-
self.kafkaHandle,
147-
unsafeMessage._internal,
148-
unsafeMessage.size
157+
cHeaders: cHeaders
149158
)
150159
}
151-
} else {
152-
// No key set.
153-
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
154-
// Returns 0 on success, error code otherwise.
155-
let unsafeMessage = RDKafkaUnsafeProducerMessage(
156-
topicHandle: topicHandle,
157-
partition: Int32(message.partition.rawValue),
158-
messageFlags: RD_KAFKA_MSG_F_COPY,
159-
key: nil,
160-
value: valueBuffer,
161-
opaque: UnsafeMutableRawPointer(bitPattern: newMessageID),
162-
headers: message.headers
163-
)
164-
165-
return rd_kafka_produceva(
166-
self.kafkaHandle,
167-
unsafeMessage._internal,
168-
unsafeMessage.size
169-
)
160+
return rd_kafka_error_code(errorPointer)
170161
}
171162
}
172163
}
173164

174-
let error = rd_kafka_error_code(errorPointer)
175165
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
176166
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
177167
}
178168
}
179169

170+
/// Wrapper for `rd_kafka_produceva`.
171+
/// (Message production with variadic options, required for sending message headers).
172+
///
173+
/// This function should only be called from within a scoped pointer accessor
174+
/// to ensure the referenced memory is valid for the function's lifetime.
175+
///
176+
/// - Returns: `nil` on success. An opaque pointer `*rd_kafka_resp_err_t` on error.
177+
private func _produceVariadic(
178+
topicHandle: OpaquePointer,
179+
partition: Int32,
180+
messageFlags: Int32,
181+
key: UnsafeRawBufferPointer?,
182+
value: UnsafeRawBufferPointer,
183+
opaque: UnsafeMutableRawPointer?,
184+
cHeaders: [(key: UnsafePointer<CChar>, value: UnsafeRawBufferPointer?)]
185+
) throws -> OpaquePointer? {
186+
let sizeWithoutHeaders = (key != nil) ? 6 : 5
187+
let size = sizeWithoutHeaders + cHeaders.count
188+
var arguments = Array(repeating: rd_kafka_vu_t(), count: size)
189+
var index = 0
190+
191+
arguments[index].vtype = RD_KAFKA_VTYPE_RKT
192+
arguments[index].u.rkt = topicHandle
193+
index += 1
194+
195+
arguments[index].vtype = RD_KAFKA_VTYPE_PARTITION
196+
arguments[index].u.i32 = partition
197+
index += 1
198+
199+
arguments[index].vtype = RD_KAFKA_VTYPE_MSGFLAGS
200+
arguments[index].u.i = messageFlags
201+
index += 1
202+
203+
if let key {
204+
arguments[index].vtype = RD_KAFKA_VTYPE_KEY
205+
arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: key.baseAddress)
206+
arguments[index].u.mem.size = key.count
207+
index += 1
208+
}
209+
210+
arguments[index].vtype = RD_KAFKA_VTYPE_VALUE
211+
arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: value.baseAddress)
212+
arguments[index].u.mem.size = value.count
213+
index += 1
214+
215+
arguments[index].vtype = RD_KAFKA_VTYPE_OPAQUE
216+
arguments[index].u.ptr = opaque
217+
index += 1
218+
219+
for cHeader in cHeaders {
220+
arguments[index].vtype = RD_KAFKA_VTYPE_HEADER
221+
222+
arguments[index].u.header.name = cHeader.key
223+
arguments[index].u.header.val = cHeader.value?.baseAddress
224+
arguments[index].u.header.size = cHeader.value?.count ?? 0
225+
226+
index += 1
227+
}
228+
229+
assert(arguments.count == size)
230+
231+
return rd_kafka_produceva(
232+
self.kafkaHandle,
233+
arguments,
234+
arguments.count
235+
)
236+
}
237+
238+
/// Scoped accessor that enables safe access to a ``KafkaProducerMessage``'s key and value raw buffers.
239+
/// - Warning: Do not escape the pointer from the closure for later use.
240+
/// - Parameter body: The closure will use the pointer.
241+
@discardableResult
242+
private static func withMessageKeyAndValueBuffer<T, Key, Value>(
243+
for message: KafkaProducerMessage<Key, Value>,
244+
_ body: (UnsafeRawBufferPointer?, UnsafeRawBufferPointer) throws -> T // (keyBuffer, valueBuffer)
245+
) rethrows -> T {
246+
try message.value.withUnsafeBytes { valueBuffer in
247+
if let key = message.key {
248+
try key.withUnsafeBytes { keyBuffer in
249+
try body(keyBuffer, valueBuffer)
250+
}
251+
} else {
252+
try body(nil, valueBuffer)
253+
}
254+
}
255+
}
256+
257+
/// Scoped accessor that enables safe access the underlying memory of an array of ``KafkaHeader``s.
258+
/// - Warning: Do not escape the pointer from the closure for later use.
259+
/// - Parameter body: The closure will use the pointer.
260+
@discardableResult
261+
private static func withKafkaCHeaders<T>(
262+
for headers: [KafkaHeader],
263+
_ body: ([(key: UnsafePointer<CChar>, value: UnsafeRawBufferPointer?)]) throws -> T
264+
) rethrows -> T {
265+
var headersMemory: [(key: UnsafePointer<CChar>, value: UnsafeRawBufferPointer?)] = []
266+
var headers: [KafkaHeader] = headers.reversed()
267+
return try self._withKafkaCHeadersRecursive(kafkaHeaders: &headers, cHeaders: &headersMemory, body)
268+
}
269+
270+
/// Recursive helper function that enables safe access the underlying memory of an array of ``KafkaHeader``s.
271+
/// Reads through all `kafkaHeaders` and stores their corresponding pointers in `cHeaders`.
272+
private static func _withKafkaCHeadersRecursive<T>(
273+
kafkaHeaders: inout [KafkaHeader],
274+
cHeaders: inout [(key: UnsafePointer<CChar>, value: UnsafeRawBufferPointer?)],
275+
_ body: ([(key: UnsafePointer<CChar>, value: UnsafeRawBufferPointer?)]) throws -> T
276+
) rethrows -> T {
277+
if kafkaHeaders.isEmpty {
278+
// Base case: we have read all kafkaHeaders and now invoke the accessor closure
279+
// that can safely access the pointers in cHeaders
280+
return try body(cHeaders)
281+
} else {
282+
guard let kafkaHeader = kafkaHeaders.popLast() else {
283+
fatalError("kafkaHeaders should not be nil")
284+
}
285+
286+
// Access underlying memory of key and value with scoped accessor and to a
287+
// recursive call to _withKafkaCHeadersRecursive in the scoped accessor.
288+
// This allows us to build a chain of scoped accessors so that the body closure
289+
// can ultimately access all kafkaHeader underlying key/value bytes safely.
290+
return try kafkaHeader.key.withCString { keyCString in
291+
if let headerValue = kafkaHeader.value {
292+
return try headerValue.withUnsafeReadableBytes { valueBuffer in
293+
let cHeader: (UnsafePointer<CChar>, UnsafeRawBufferPointer?) = (keyCString, valueBuffer)
294+
cHeaders.append(cHeader)
295+
return try self._withKafkaCHeadersRecursive(
296+
kafkaHeaders: &kafkaHeaders,
297+
cHeaders: &cHeaders,
298+
body
299+
)
300+
}
301+
} else {
302+
let cHeader: (UnsafePointer<CChar>, UnsafeRawBufferPointer?) = (keyCString, nil)
303+
cHeaders.append(cHeader)
304+
return try self._withKafkaCHeadersRecursive(
305+
kafkaHeaders: &kafkaHeaders,
306+
cHeaders: &cHeaders,
307+
body
308+
)
309+
}
310+
}
311+
}
312+
}
313+
180314
/// Swift wrapper for events from `librdkafka`'s event queue.
181315
enum KafkaEvent {
182316
case deliveryReport(results: [KafkaDeliveryReport])

Sources/Kafka/RDKafka/RDKafkaUnsafeProducerMessage.swift

Lines changed: 0 additions & 112 deletions
This file was deleted.

0 commit comments

Comments
 (0)