From 7d196829435d8c8a99b2d6c636b683b4d16db0a0 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Mon, 2 Oct 2023 20:41:10 +0300 Subject: [PATCH 01/12] Add performance test for swift-kafka vs librdkafka --- .../KafkaConsumerBenchmark.swift | 206 ++++++++++++ .../SwiftKafkaBenchmark/Utilities.swift | 304 ++++++++++++++++++ Package.swift | 12 + .../docker-compose-consumer-performance.yaml | 66 ++++ 4 files changed, 588 insertions(+) create mode 100644 Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift create mode 100644 Benchmarks/SwiftKafkaBenchmark/Utilities.swift create mode 100644 docker/docker-compose-consumer-performance.yaml diff --git a/Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift b/Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift new file mode 100644 index 00000000..d7518980 --- /dev/null +++ b/Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift @@ -0,0 +1,206 @@ +import Crdkafka +import Kafka +import Foundation +import NIOCore +import ServiceLifecycle +import Logging +import Benchmark + +private let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "3000000")! + +private var uniqueTestTopic: String! +private var client: RDKafkaClientHolder! +private var testMessages: [KafkaProducerMessage]! + +let benchmarks = { + Benchmark.defaultConfiguration = .init( + metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory] + BenchmarkMetric.arc, + warmupIterations: 0, + scalingFactor: .one, + maxDuration: .seconds(5), + maxIterations: 1 + ) + + Benchmark.setup = { + var basicConfigDict: [String: String] = [ + "bootstrap.servers": "\(kafkaHost):\(kafkaPort)", + "broker.address.family" : "v4" + ] + + client = RDKafkaClientHolder(configDictionary: basicConfigDict, type: .consumer) + + uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) + benchLog("Created topic \(uniqueTestTopic!)") + + benchLog("Generating \(numOfMessages) messages") + testMessages = createTestMessages(topic: uniqueTestTopic, count: numOfMessages) + benchLog("Finish generating \(numOfMessages) messages") + + var producerConfig: KafkaProducerConfiguration! + + producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(numOfMessages) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup1.run() + } + + // Producer Task + group.addTask { + try await sendAndAcknowledgeMessages( + producer: producer, + events: acks, + messages: testMessages + ) + } + + // Wait for Producer Task to complete + try await group.next() + await serviceGroup1.triggerGracefulShutdown() + } + } + + Benchmark.teardown = { + try? client._deleteTopic(uniqueTestTopic, timeout: -1) + } + + Benchmark("SwiftKafkaConsumer") { benchmark in + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress()] + ) + consumerConfig.pollInterval = .milliseconds(1) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + consumerConfig.pollInterval = .milliseconds(1) + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: logger + ) + + let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2) + + benchmark.startMeasurement() + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start consuming") + defer { + benchLog("Finish consuming") + } + // Run Task + group.addTask { + try await serviceGroup2.run() + } + + // Second Consumer Task + group.addTask { + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + for try await record in consumer.messages { + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") + tmpCtr = 0 + } + if ctr >= numOfMessages { + break + } + } + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + // Wait for second Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup2.triggerGracefulShutdown() + } + + benchmark.stopMeasurement() + } + + Benchmark("librdkafka") { benchmark in + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(kafkaHost):\(kafkaPort)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning" + ] + + let consumer = RDKafkaClientHolder(configDictionary: rdKafkaConsumerConfig, type: .consumer) + rd_kafka_poll_set_consumer(consumer.kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(consumer.kafkaHandle, subscriptionList) + rd_kafka_poll(consumer.kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.startMeasurement() + + while ctr < numOfMessages { + guard let record = rd_kafka_consumer_poll(consumer.kafkaHandle, 0) else { + try await Task.sleep(for: .milliseconds(1)) // set as defaulat pollInterval for swift-kafka + continue + } + defer { + rd_kafka_message_destroy(record) + } + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") + tmpCtr = 0 + } + } + + benchmark.stopMeasurement() + + rd_kafka_consumer_close(consumer.kafkaHandle) + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } +} diff --git a/Benchmarks/SwiftKafkaBenchmark/Utilities.swift b/Benchmarks/SwiftKafkaBenchmark/Utilities.swift new file mode 100644 index 00000000..05fa92e0 --- /dev/null +++ b/Benchmarks/SwiftKafkaBenchmark/Utilities.swift @@ -0,0 +1,304 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import Foundation +import Kafka +import Logging + +extension Logger { + static var testLogger: Logger { + var logger = Logger(label: "bench_log") + #if DEBUG + logger.logLevel = .debug + #else + logger.logLevel = .info + #endif + return logger + } +} + +let logger: Logger = .testLogger +let stringSize = 1024 +let kafkaHost: String = kafkaHostFromEnv() +let kafkaPort: Int = kafkaPortFromEnv() + +func benchLog(_ msg: @autoclosure () -> Logger.Message) { +// Just in case for debug + #if DEBUG + logger.debug(msg()) + #endif +} + +enum RDKafkaClientHolderError: Error { + case generic(String) +} + +class RDKafkaClientHolder { + let kafkaHandle: OpaquePointer + + enum HandleType { + case producer + case consumer + } + + init(configDictionary: [String: String], type: HandleType) { + let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) + defer { errorChars.deallocate() } + + let config: OpaquePointer = rd_kafka_conf_new() + configDictionary.forEach { key, value in + let res = rd_kafka_conf_set( + config, + key, + value, + errorChars, + stringSize + ) + if res != RD_KAFKA_CONF_OK { + fatalError("Could not set \(key) with \(value)") + } + } + + guard let handle = rd_kafka_new( + type == .consumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER, + config, + errorChars, + stringSize + ) else { + fatalError("Could not create client") + } + self.kafkaHandle = handle + } + + deinit { + rd_kafka_poll(self.kafkaHandle, 0) + rd_kafka_destroy(self.kafkaHandle) + } + + /// Create a topic with a unique name (`UUID`). + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter timeout: Timeout in milliseconds. + /// - Returns: Name of newly created topic. + /// - Throws: A ``KafkaError`` if the topic creation failed. + func _createUniqueTopic(timeout: Int32) throws -> String { + let uniqueTopicName = UUID().uuidString + + let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) + defer { errorChars.deallocate() } + + guard let newTopic = rd_kafka_NewTopic_new( + uniqueTopicName, + 6, // use default num_partitions + -1, // use default replication_factor + errorChars, + stringSize + ) else { + let errorString = String(cString: errorChars) + throw RDKafkaClientHolderError.generic(errorString) + } + defer { rd_kafka_NewTopic_destroy(newTopic) } + + + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var newTopicsArray: [OpaquePointer?] = [newTopic] + rd_kafka_CreateTopics( + kafkaHandle, + &newTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw RDKafkaClientHolderError.generic("No CreateTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw RDKafkaClientHolderError.generic("code: \(resultCode)") + } + + guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { + throw RDKafkaClientHolderError.generic("Received event that is not of type rd_kafka_CreateTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_CreateTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw RDKafkaClientHolderError.generic("Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw RDKafkaClientHolderError.generic("code: \(topicResultError)") + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == uniqueTopicName else { + throw RDKafkaClientHolderError.generic("Received topic result for topic with different name") + } + + return uniqueTopicName + } + +// func deleteTopic(_ topic: String, timeout: Int32 = 10000) async throws { +// try await withCheckedThrowingContinuation { continuation in +// do { +// try self._deleteTopic(topic, timeout: timeout) +// continuation.resume() +// } catch { +// continuation.resume(throwing: error) +// } +// } +// } + + /// Delete a topic. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter topic: Topic to delete. + /// - Parameter timeout: Timeout in milliseconds. + /// - Throws: A ``KafkaError`` if the topic deletion failed. + func _deleteTopic(_ topic: String, timeout: Int32) throws { + let deleteTopic = rd_kafka_DeleteTopic_new(topic) + defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } + + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] + rd_kafka_DeleteTopics( + kafkaHandle, + &deleteTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw RDKafkaClientHolderError.generic("No DeleteTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw RDKafkaClientHolderError.generic("code: \(resultCode)") + } + + guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { + throw RDKafkaClientHolderError.generic("Received event that is not of type rd_kafka_DeleteTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_DeleteTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw RDKafkaClientHolderError.generic("Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw RDKafkaClientHolderError.generic("code: \(topicResultError)") + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == topic else { + throw RDKafkaClientHolderError.generic("Received topic result for topic with different name") + } + } +} + +func sendAndAcknowledgeMessages( + producer: KafkaProducer, + events: KafkaProducerEvents, + messages: [KafkaProducerMessage] +) async throws { + for message in messages { + while true { // Note: this is an example of queue full + do { + try producer.send(message) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + continue + } catch { + benchLog("Caught some error: \(error)") + throw error + } + } + } + + var receivedDeliveryReportsCtr = 0 + var prevPercent = 0 + + for await event in events { + switch event { + case .deliveryReports(let deliveryReports): + receivedDeliveryReportsCtr += deliveryReports.count + default: + break // Ignore any other events + } + let curPercent = receivedDeliveryReportsCtr * 100 / messages.count + if curPercent >= prevPercent + 10 { + benchLog("Delivered \(curPercent)% of messages") + prevPercent = curPercent + } + + if receivedDeliveryReportsCtr >= messages.count { + break + } + } +} + +func createTestMessages( + topic: String, + headers: [KafkaHeader] = [], + count: UInt +) -> [KafkaProducerMessage] { + return Array(0.. String? { + ProcessInfo.processInfo.environment[key] +} + +func kafkaHostFromEnv() -> String { + getFromEnv("KAFKA_HOST") ?? "localhost" +} + +func kafkaPortFromEnv() -> Int { + .init(getFromEnv("KAFKA_PORT") ?? "9092")! +} + +func bootstrapBrokerAddress() -> KafkaConfiguration.BrokerAddress { + .init( + host: kafkaHost, + port: kafkaPort + ) +} diff --git a/Package.swift b/Package.swift index 8d463327..2e7bfcf9 100644 --- a/Package.swift +++ b/Package.swift @@ -52,6 +52,7 @@ let package = Package( // The zstd Swift package produces warnings that we cannot resolve: // https://github.com/facebook/zstd/issues/3328 .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"), + .package(url: "https://github.com/ordo-one/package-benchmark", from: "1.11.2"), ], targets: [ .target( @@ -99,5 +100,16 @@ let package = Package( name: "IntegrationTests", dependencies: ["Kafka"] ), + .executableTarget( + name: "SwiftKafkaBenchmark", + dependencies: [ + "Kafka", + .product(name: "Benchmark", package: "package-benchmark"), + ], + path: "Benchmarks/SwiftKafkaBenchmark", + plugins: [ + .plugin(name: "BenchmarkPlugin", package: "package-benchmark"), + ] + ), ] ) diff --git a/docker/docker-compose-consumer-performance.yaml b/docker/docker-compose-consumer-performance.yaml new file mode 100644 index 00000000..3b9feedb --- /dev/null +++ b/docker/docker-compose-consumer-performance.yaml @@ -0,0 +1,66 @@ +# this file is not designed to be run directly +# instead, use the docker-compose.. files +# eg docker-compose -f docker/docker-compose.yaml -f docker/docker-compose.2204.57.yaml run test +version: "3.9" +services: + + zookeeper: + image: ubuntu/zookeeper + + kafka: + image: ubuntu/kafka + depends_on: + - zookeeper + environment: + ZOOKEEPER_HOST: zookeeper + + swift-kafka-client: + depends_on: + - kafka + build: + context: .. + dockerfile: docker/Dockerfile + environment: + KAFKA_HOST: kafka + + runtime-setup: + image: swift-kafka-client:default + build: + context: . + dockerfile: Dockerfile + + common: &common + image: swift-kafka-client:default + depends_on: [runtime-setup] + volumes: + - ~/.ssh:/root/.ssh + - ..:/code:z + working_dir: /code + + soundness: + <<: *common + command: /bin/bash -xcl "swift -version && uname -a && ./scripts/soundness.sh" + + build: + <<: *common + environment: [] + command: /bin/bash -cl "swift build" + + perf-test: + <<: *common + depends_on: [kafka, runtime-setup] + environment: + KAFKA_HOST: kafka + BENCHMARK_DISABLE_JEMALLOC: true + MESSAGES_NUMBER: 3000000 + command: > + /bin/bash -xcl " + swift build -c release && \ + swift package --disable-sandbox benchmark + " + + # util + + shell: + <<: *common + entrypoint: /bin/bash From 1a1eea2186a3fa2cb4faab30876bb6f7ad2689aa Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 12 Oct 2023 16:23:33 +0300 Subject: [PATCH 02/12] mimic docker files --- .../KafkaConsumerBenchmark.swift | 0 .../SwiftKafkaBenchmarks}/Utilities.swift | 0 Benchmarks/Package.swift | 27 +++++++++++++++++++ Package.swift | 12 --------- docker/Dockerfile | 3 +++ docker/docker-compose.yaml | 13 ++++++--- 6 files changed, 39 insertions(+), 16 deletions(-) rename Benchmarks/{SwiftKafkaBenchmark => Benchmarks/SwiftKafkaBenchmarks}/KafkaConsumerBenchmark.swift (100%) rename Benchmarks/{SwiftKafkaBenchmark => Benchmarks/SwiftKafkaBenchmarks}/Utilities.swift (100%) create mode 100644 Benchmarks/Package.swift diff --git a/Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift similarity index 100% rename from Benchmarks/SwiftKafkaBenchmark/KafkaConsumerBenchmark.swift rename to Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift diff --git a/Benchmarks/SwiftKafkaBenchmark/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift similarity index 100% rename from Benchmarks/SwiftKafkaBenchmark/Utilities.swift rename to Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift new file mode 100644 index 00000000..ab1d5c14 --- /dev/null +++ b/Benchmarks/Package.swift @@ -0,0 +1,27 @@ +// swift-tools-version: 5.7 + +import PackageDescription + +let package = Package( + name: "benchmarks", + platforms: [ + .macOS(.v13), + ], + dependencies: [ + .package(path: "../"), + .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.11.1"), + ], + targets: [ + .executableTarget( + name: "SwiftKafkaBenchmarks", + dependencies: [ + .product(name: "Benchmark", package: "package-benchmark"), + .product(name: "Kafka", package: "swift-kafka-client"), + ], + path: "Benchmarks/SwiftKafkaBenchmarks", + plugins: [ + .plugin(name: "BenchmarkPlugin", package: "package-benchmark") + ] + ), + ] +) diff --git a/Package.swift b/Package.swift index 2e7bfcf9..8d463327 100644 --- a/Package.swift +++ b/Package.swift @@ -52,7 +52,6 @@ let package = Package( // The zstd Swift package produces warnings that we cannot resolve: // https://github.com/facebook/zstd/issues/3328 .package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"), - .package(url: "https://github.com/ordo-one/package-benchmark", from: "1.11.2"), ], targets: [ .target( @@ -100,16 +99,5 @@ let package = Package( name: "IntegrationTests", dependencies: ["Kafka"] ), - .executableTarget( - name: "SwiftKafkaBenchmark", - dependencies: [ - "Kafka", - .product(name: "Benchmark", package: "package-benchmark"), - ], - path: "Benchmarks/SwiftKafkaBenchmark", - plugins: [ - .plugin(name: "BenchmarkPlugin", package: "package-benchmark"), - ] - ), ] ) diff --git a/docker/Dockerfile b/docker/Dockerfile index 31ae9a21..5daee617 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,6 +15,9 @@ ENV LANGUAGE en_US.UTF-8 # Dependencies RUN apt-get update RUN apt-get install libsasl2-dev -y +RUN apt-get install libjemalloc-dev -y +RUN apt-get update & apt-get install -y libjemalloc-dev + # tools RUN mkdir -p $HOME/.tools diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index ff95cc5c..3b85e4fa 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -34,8 +34,8 @@ services: depends_on: [runtime-setup] volumes: - ~/.ssh:/root/.ssh - - ..:/code:z - working_dir: /code + - ..:/swift-kafka-client:z + working_dir: /swift-kafka-client soundness: <<: *common @@ -53,9 +53,14 @@ services: KAFKA_HOST: kafka command: > /bin/bash -xcl " - swift build --build-tests $${SANITIZER_ARG-} && \ - swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} + cd Benchmarks && swift package --disable-sandbox benchmark baseline check --check-absolute-path Thresholds/$${SWIFT_VERSION-}/ " +# TODO: tmp disable tests +# swift build --build-tests $${SANITIZER_ARG-} && \ +# swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} && \ + update-benchmark-baseline: + <<: *common + command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/" # util From 5bf4d0281d9480b4eba858160263db7eae2f927b Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:49:54 +0300 Subject: [PATCH 03/12] use simmilar to nio benchmark structure --- .../KafkaConsumerBenchmark.swift | 2 +- ...afkaBenchmarks.SwiftKafkaConsumer.p90.json | 9 +++ .../SwiftKafkaBenchmarks.librdkafka.p90.json | 9 +++ ...afkaBenchmarks.SwiftKafkaConsumer.p90.json | 9 +++ .../SwiftKafkaBenchmarks.librdkafka.p90.json | 9 +++ ...afkaBenchmarks.SwiftKafkaConsumer.p90.json | 9 +++ .../SwiftKafkaBenchmarks.librdkafka.p90.json | 9 +++ ...afkaBenchmarks.SwiftKafkaConsumer.p90.json | 9 +++ .../SwiftKafkaBenchmarks.librdkafka.p90.json | 9 +++ ...afkaBenchmarks.SwiftKafkaConsumer.p90.json | 9 +++ .../SwiftKafkaBenchmarks.librdkafka.p90.json | 9 +++ dev/update-benchmark-thresholds.sh | 15 +++++ .../docker-compose-consumer-performance.yaml | 66 ------------------- docker/docker-compose.2204.510.yaml | 5 ++ docker/docker-compose.2204.57.yaml | 5 ++ docker/docker-compose.2204.58.yaml | 5 ++ docker/docker-compose.2204.59.yaml | 5 ++ docker/docker-compose.2204.main.yaml | 5 ++ docker/docker-compose.yaml | 9 ++- 19 files changed, 137 insertions(+), 70 deletions(-) create mode 100644 Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json create mode 100644 Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json create mode 100644 Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json create mode 100644 Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json create mode 100644 Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json create mode 100644 Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json create mode 100644 Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json create mode 100644 Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json create mode 100644 Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json create mode 100644 Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json create mode 100755 dev/update-benchmark-thresholds.sh delete mode 100644 docker/docker-compose-consumer-performance.yaml diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift index d7518980..16966d82 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift @@ -6,7 +6,7 @@ import ServiceLifecycle import Logging import Benchmark -private let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "3000000")! +private let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "500000")! private var uniqueTestTopic: String! private var client: RDKafkaClientHolder! diff --git a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json new file mode 100644 index 00000000..2aee64d1 --- /dev/null +++ b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 241631232, + "cpuTotal" : 8350000000, + "objectAllocCount" : 2021543, + "releaseCount" : 14078309, + "retainCount" : 11040340, + "retainReleaseDelta" : 1016426, + "wallClock" : 10306245297 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json new file mode 100644 index 00000000..286fec8f --- /dev/null +++ b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 363003904, + "cpuTotal" : 160000000, + "objectAllocCount" : 210, + "releaseCount" : 315, + "retainCount" : 0, + "retainReleaseDelta" : 105, + "wallClock" : 195243208 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json new file mode 100644 index 00000000..4e9fd75c --- /dev/null +++ b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 162463744, + "cpuTotal" : 8580000000, + "objectAllocCount" : 2023297, + "releaseCount" : 16641710, + "retainCount" : 13600233, + "retainReleaseDelta" : 1018180, + "wallClock" : 10797182506 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json new file mode 100644 index 00000000..e0428dfe --- /dev/null +++ b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 366739456, + "cpuTotal" : 150000000, + "objectAllocCount" : 210, + "releaseCount" : 315, + "retainCount" : 0, + "retainReleaseDelta" : 105, + "wallClock" : 185274416 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json new file mode 100644 index 00000000..625f41f0 --- /dev/null +++ b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 159383552, + "cpuTotal" : 6170000000, + "objectAllocCount" : 2020626, + "releaseCount" : 13604331, + "retainCount" : 10568194, + "retainReleaseDelta" : 1015511, + "wallClock" : 9365130088 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json new file mode 100644 index 00000000..feaafba3 --- /dev/null +++ b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 365428736, + "cpuTotal" : 120000000, + "objectAllocCount" : 208, + "releaseCount" : 312, + "retainCount" : 0, + "retainReleaseDelta" : 104, + "wallClock" : 170432000 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json new file mode 100644 index 00000000..817fec4b --- /dev/null +++ b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 178651136, + "cpuTotal" : 6750000000, + "objectAllocCount" : 2020774, + "releaseCount" : 13589597, + "retainCount" : 10553164, + "retainReleaseDelta" : 1015659, + "wallClock" : 9534342338 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json new file mode 100644 index 00000000..d60d2d8d --- /dev/null +++ b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 372375552, + "cpuTotal" : 120000000, + "objectAllocCount" : 216, + "releaseCount" : 324, + "retainCount" : 0, + "retainReleaseDelta" : 108, + "wallClock" : 176588708 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json new file mode 100644 index 00000000..1c9e7f02 --- /dev/null +++ b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 161611776, + "cpuTotal" : 7800000000, + "objectAllocCount" : 2021025, + "releaseCount" : 14075173, + "retainCount" : 11038244, + "retainReleaseDelta" : 1015904, + "wallClock" : 10082730324 +} \ No newline at end of file diff --git a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json new file mode 100644 index 00000000..5f089ba6 --- /dev/null +++ b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -0,0 +1,9 @@ +{ + "allocatedResidentMemory" : 361431040, + "cpuTotal" : 120000000, + "objectAllocCount" : 218, + "releaseCount" : 327, + "retainCount" : 0, + "retainReleaseDelta" : 109, + "wallClock" : 177079834 +} \ No newline at end of file diff --git a/dev/update-benchmark-thresholds.sh b/dev/update-benchmark-thresholds.sh new file mode 100755 index 00000000..2f999036 --- /dev/null +++ b/dev/update-benchmark-thresholds.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -eu +set -o pipefail + +here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +target_repo=${2-"$here/.."} + +for f in 57 58 59 510 -nightly; do + echo "swift$f" + + docker_file=$(if [[ "$f" == "-nightly" ]]; then f=main; fi && ls "$target_repo/docker/docker-compose."*"$f"*".yaml") + + docker-compose -f docker/docker-compose.yaml -f $docker_file run update-benchmark-baseline +done diff --git a/docker/docker-compose-consumer-performance.yaml b/docker/docker-compose-consumer-performance.yaml deleted file mode 100644 index 3b9feedb..00000000 --- a/docker/docker-compose-consumer-performance.yaml +++ /dev/null @@ -1,66 +0,0 @@ -# this file is not designed to be run directly -# instead, use the docker-compose.. files -# eg docker-compose -f docker/docker-compose.yaml -f docker/docker-compose.2204.57.yaml run test -version: "3.9" -services: - - zookeeper: - image: ubuntu/zookeeper - - kafka: - image: ubuntu/kafka - depends_on: - - zookeeper - environment: - ZOOKEEPER_HOST: zookeeper - - swift-kafka-client: - depends_on: - - kafka - build: - context: .. - dockerfile: docker/Dockerfile - environment: - KAFKA_HOST: kafka - - runtime-setup: - image: swift-kafka-client:default - build: - context: . - dockerfile: Dockerfile - - common: &common - image: swift-kafka-client:default - depends_on: [runtime-setup] - volumes: - - ~/.ssh:/root/.ssh - - ..:/code:z - working_dir: /code - - soundness: - <<: *common - command: /bin/bash -xcl "swift -version && uname -a && ./scripts/soundness.sh" - - build: - <<: *common - environment: [] - command: /bin/bash -cl "swift build" - - perf-test: - <<: *common - depends_on: [kafka, runtime-setup] - environment: - KAFKA_HOST: kafka - BENCHMARK_DISABLE_JEMALLOC: true - MESSAGES_NUMBER: 3000000 - command: > - /bin/bash -xcl " - swift build -c release && \ - swift package --disable-sandbox benchmark - " - - # util - - shell: - <<: *common - entrypoint: /bin/bash diff --git a/docker/docker-compose.2204.510.yaml b/docker/docker-compose.2204.510.yaml index be270f0b..17acb143 100644 --- a/docker/docker-compose.2204.510.yaml +++ b/docker/docker-compose.2204.510.yaml @@ -19,5 +19,10 @@ services: - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still + update-benchmark-baseline: + image: swift-kafka-client:22.04-5.10 + environment: + - SWIFT_VERSION=5.10 + shell: image: swift-kafka-client:22.04-5.10 diff --git a/docker/docker-compose.2204.57.yaml b/docker/docker-compose.2204.57.yaml index 04060a2c..af7cda0c 100644 --- a/docker/docker-compose.2204.57.yaml +++ b/docker/docker-compose.2204.57.yaml @@ -19,5 +19,10 @@ services: - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still + update-benchmark-baseline: + image: swift-kafka-client:22.04-5.7 + environment: + - SWIFT_VERSION=5.7 + shell: image: swift-kafka-client:22.04-5.7 diff --git a/docker/docker-compose.2204.58.yaml b/docker/docker-compose.2204.58.yaml index afa74b8d..521c6ac9 100644 --- a/docker/docker-compose.2204.58.yaml +++ b/docker/docker-compose.2204.58.yaml @@ -20,5 +20,10 @@ services: - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still + update-benchmark-baseline: + image: swift-kafka-client:22.04-5.8 + environment: + - SWIFT_VERSION=5.8 + shell: image: swift-kafka-client:22.04-5.8 diff --git a/docker/docker-compose.2204.59.yaml b/docker/docker-compose.2204.59.yaml index f238c9d3..e0a562d7 100644 --- a/docker/docker-compose.2204.59.yaml +++ b/docker/docker-compose.2204.59.yaml @@ -20,5 +20,10 @@ services: - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still + update-benchmark-baseline: + image: swift-kafka-client:22.04-5.9 + environment: + - SWIFT_VERSION=5.9 + shell: image: swift-kafka-client:22.04-5.9 diff --git a/docker/docker-compose.2204.main.yaml b/docker/docker-compose.2204.main.yaml index 7ae20fd8..b4e098cf 100644 --- a/docker/docker-compose.2204.main.yaml +++ b/docker/docker-compose.2204.main.yaml @@ -16,5 +16,10 @@ services: - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still + update-benchmark-baseline: + image: swift-kafka-client:22.04-main + environment: + - SWIFT_VERSION=main + shell: image: swift-kafka-client:22.04-main diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 3b85e4fa..9a22d1f6 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -53,13 +53,16 @@ services: KAFKA_HOST: kafka command: > /bin/bash -xcl " + swift build --build-tests $${SANITIZER_ARG-} && \ + swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} && \ cd Benchmarks && swift package --disable-sandbox benchmark baseline check --check-absolute-path Thresholds/$${SWIFT_VERSION-}/ " -# TODO: tmp disable tests -# swift build --build-tests $${SANITIZER_ARG-} && \ -# swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} && \ + update-benchmark-baseline: <<: *common + depends_on: [kafka, runtime-setup] + environment: + KAFKA_HOST: kafka command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/" # util From e0219d012cb01941375f6d73015d02f4f3144ce8 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:53:04 +0300 Subject: [PATCH 04/12] add headers --- .../KafkaConsumerBenchmark.swift | 14 ++++++++++++++ .../SwiftKafkaBenchmarks/Utilities.swift | 13 +------------ Benchmarks/Package.swift | 13 +++++++++++++ dev/update-benchmark-thresholds.sh | 13 +++++++++++++ docker/Dockerfile | 1 - 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift index 16966d82..ae55c98f 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + import Crdkafka import Kafka import Foundation diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift index 05fa92e0..9272e532 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift @@ -2,7 +2,7 @@ // // This source file is part of the swift-kafka-client open source project // -// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -160,17 +160,6 @@ class RDKafkaClientHolder { return uniqueTopicName } -// func deleteTopic(_ topic: String, timeout: Int32 = 10000) async throws { -// try await withCheckedThrowingContinuation { continuation in -// do { -// try self._deleteTopic(topic, timeout: timeout) -// continuation.resume() -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - /// Delete a topic. /// Blocks for a maximum of `timeout` milliseconds. /// - Parameter topic: Topic to delete. diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index ab1d5c14..da9a8abe 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -1,4 +1,17 @@ // swift-tools-version: 5.7 +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// import PackageDescription diff --git a/dev/update-benchmark-thresholds.sh b/dev/update-benchmark-thresholds.sh index 2f999036..240a7434 100755 --- a/dev/update-benchmark-thresholds.sh +++ b/dev/update-benchmark-thresholds.sh @@ -1,4 +1,17 @@ #!/bin/bash +##===----------------------------------------------------------------------===// +## +## This source file is part of the swift-kafka-client open source project +## +## Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===// set -eu set -o pipefail diff --git a/docker/Dockerfile b/docker/Dockerfile index 5daee617..1f4de780 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,6 @@ ENV LANGUAGE en_US.UTF-8 RUN apt-get update RUN apt-get install libsasl2-dev -y RUN apt-get install libjemalloc-dev -y -RUN apt-get update & apt-get install -y libjemalloc-dev # tools From 907f0e18927f1c13ef05a06d8734d60425c88b41 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 12 Oct 2023 19:04:03 +0300 Subject: [PATCH 05/12] add 'benchmark' docker target to show results --- docker/docker-compose.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 9a22d1f6..d8789f7c 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -58,6 +58,16 @@ services: cd Benchmarks && swift package --disable-sandbox benchmark baseline check --check-absolute-path Thresholds/$${SWIFT_VERSION-}/ " + benchmark: + <<: *common + depends_on: [kafka, runtime-setup] + environment: + KAFKA_HOST: kafka + command: > + /bin/bash -xcl " + cd Benchmarks && swift package --disable-sandbox benchmark + " + update-benchmark-baseline: <<: *common depends_on: [kafka, runtime-setup] From acb54abfd3d6d6bdf37f9d76c1d56165cedec457 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:46:49 +0300 Subject: [PATCH 06/12] approx structure for kafka test utils --- .../KafkaConsumerBenchmark.swift | 106 +++---- .../SwiftKafkaBenchmarks/Utilities.swift | 278 +++--------------- Benchmarks/Package.swift | 1 + Package.swift | 14 +- .../KafkaConsumerConfiguration.swift | 2 +- Sources/Kafka/KafkaError.swift | 9 +- Sources/Kafka/RDKafka/RDKafkaClient.swift | 21 +- Sources/Kafka/RDKafka/RDKafkaEvent.swift | 3 +- Sources/KafkaTestUtils/KafkaTestLogger.swift | 23 ++ .../KafkaTestUtils/KafkaTestMessages.swift | 62 ++++ .../TestRDKafkaClient+TopicCreation.swift | 152 ++++++++++ .../KafkaTestUtils/TestRDKafkaClient.swift | 47 +++ Tests/IntegrationTests/KafkaTests.swift | 119 ++------ Tests/IntegrationTests/Utilities.swift | 181 ------------ Tests/KafkaTests/KafkaConsumerTests.swift | 1 + Tests/KafkaTests/KafkaProducerTests.swift | 1 + Tests/KafkaTests/Utilities.swift | 8 - 17 files changed, 446 insertions(+), 582 deletions(-) create mode 100644 Sources/KafkaTestUtils/KafkaTestLogger.swift create mode 100644 Sources/KafkaTestUtils/KafkaTestMessages.swift create mode 100644 Sources/KafkaTestUtils/TestRDKafkaClient+TopicCreation.swift create mode 100644 Sources/KafkaTestUtils/TestRDKafkaClient.swift diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift index ae55c98f..16a88cac 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift @@ -14,6 +14,7 @@ import Crdkafka import Kafka +import KafkaTestUtils import Foundation import NIOCore import ServiceLifecycle @@ -23,12 +24,12 @@ import Benchmark private let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "500000")! private var uniqueTestTopic: String! -private var client: RDKafkaClientHolder! +private var client: TestRDKafkaClient! private var testMessages: [KafkaProducerMessage]! let benchmarks = { Benchmark.defaultConfiguration = .init( - metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory] + BenchmarkMetric.arc, + metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory], warmupIterations: 0, scalingFactor: .one, maxDuration: .seconds(5), @@ -36,18 +37,14 @@ let benchmarks = { ) Benchmark.setup = { - var basicConfigDict: [String: String] = [ - "bootstrap.servers": "\(kafkaHost):\(kafkaPort)", - "broker.address.family" : "v4" - ] - - client = RDKafkaClientHolder(configDictionary: basicConfigDict, type: .consumer) + let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress()) + client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) benchLog("Created topic \(uniqueTestTopic!)") benchLog("Generating \(numOfMessages) messages") - testMessages = createTestMessages(topic: uniqueTestTopic, count: numOfMessages) + testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, count: numOfMessages) benchLog("Finish generating \(numOfMessages) messages") var producerConfig: KafkaProducerConfiguration! @@ -73,7 +70,7 @@ let benchmarks = { // Producer Task group.addTask { - try await sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: acks, messages: testMessages @@ -88,6 +85,7 @@ let benchmarks = { Benchmark.teardown = { try? client._deleteTopic(uniqueTestTopic, timeout: -1) + client = nil } Benchmark("SwiftKafkaConsumer") { benchmark in @@ -168,53 +166,55 @@ let benchmarks = { "auto.offset.reset": "beginning" ] - let consumer = RDKafkaClientHolder(configDictionary: rdKafkaConsumerConfig, type: .consumer) - rd_kafka_poll_set_consumer(consumer.kafkaHandle) - let subscriptionList = rd_kafka_topic_partition_list_new(1) - defer { - rd_kafka_topic_partition_list_destroy(subscriptionList) - } - rd_kafka_topic_partition_list_add( - subscriptionList, - uniqueTestTopic, - RD_KAFKA_PARTITION_UA - ) - rd_kafka_subscribe(consumer.kafkaHandle, subscriptionList) - rd_kafka_poll(consumer.kafkaHandle, 0) - - var ctr: UInt64 = 0 - var tmpCtr: UInt64 = 0 - - let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) - let totalStartDate = Date.timeIntervalSinceReferenceDate - var totalBytes: UInt64 = 0 - - benchmark.startMeasurement() - - while ctr < numOfMessages { - guard let record = rd_kafka_consumer_poll(consumer.kafkaHandle, 0) else { - try await Task.sleep(for: .milliseconds(1)) // set as defaulat pollInterval for swift-kafka - continue - } + let consumer = try TestRDKafkaClient._makeRDKafkaClient(config: rdKafkaConsumerConfig) + try await consumer.withKafkaHandlePointer { kafkaHandle in + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) defer { - rd_kafka_message_destroy(record) + rd_kafka_topic_partition_list_destroy(subscriptionList) } - ctr += 1 - totalBytes += UInt64(record.pointee.len) + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 - tmpCtr += 1 - if tmpCtr >= interval { - benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") - tmpCtr = 0 + benchmark.startMeasurement() + + while ctr < numOfMessages { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 0) else { + try await Task.sleep(for: .milliseconds(1)) // set as defaulat pollInterval for swift-kafka + continue + } + defer { + rd_kafka_message_destroy(record) + } + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") + tmpCtr = 0 + } } + + benchmark.stopMeasurement() + + rd_kafka_consumer_close(kafkaHandle) + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") } - - benchmark.stopMeasurement() - - rd_kafka_consumer_close(consumer.kafkaHandle) - - let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate - let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 - benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") } } diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift index 9272e532..92367276 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift @@ -15,6 +15,7 @@ import Crdkafka import Foundation import Kafka +import KafkaTestUtils import Logging extension Logger { @@ -41,237 +42,54 @@ func benchLog(_ msg: @autoclosure () -> Logger.Message) { #endif } -enum RDKafkaClientHolderError: Error { - case generic(String) -} - -class RDKafkaClientHolder { - let kafkaHandle: OpaquePointer - - enum HandleType { - case producer - case consumer - } - - init(configDictionary: [String: String], type: HandleType) { - let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) - defer { errorChars.deallocate() } - - let config: OpaquePointer = rd_kafka_conf_new() - configDictionary.forEach { key, value in - let res = rd_kafka_conf_set( - config, - key, - value, - errorChars, - stringSize - ) - if res != RD_KAFKA_CONF_OK { - fatalError("Could not set \(key) with \(value)") - } - } - - guard let handle = rd_kafka_new( - type == .consumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER, - config, - errorChars, - stringSize - ) else { - fatalError("Could not create client") - } - self.kafkaHandle = handle - } - - deinit { - rd_kafka_poll(self.kafkaHandle, 0) - rd_kafka_destroy(self.kafkaHandle) - } - - /// Create a topic with a unique name (`UUID`). - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter timeout: Timeout in milliseconds. - /// - Returns: Name of newly created topic. - /// - Throws: A ``KafkaError`` if the topic creation failed. - func _createUniqueTopic(timeout: Int32) throws -> String { - let uniqueTopicName = UUID().uuidString - - let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) - defer { errorChars.deallocate() } - - guard let newTopic = rd_kafka_NewTopic_new( - uniqueTopicName, - 6, // use default num_partitions - -1, // use default replication_factor - errorChars, - stringSize - ) else { - let errorString = String(cString: errorChars) - throw RDKafkaClientHolderError.generic(errorString) - } - defer { rd_kafka_NewTopic_destroy(newTopic) } - - - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var newTopicsArray: [OpaquePointer?] = [newTopic] - rd_kafka_CreateTopics( - kafkaHandle, - &newTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw RDKafkaClientHolderError.generic("No CreateTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw RDKafkaClientHolderError.generic("code: \(resultCode)") - } - - guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { - throw RDKafkaClientHolderError.generic("Received event that is not of type rd_kafka_CreateTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_CreateTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw RDKafkaClientHolderError.generic("Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw RDKafkaClientHolderError.generic("code: \(topicResultError)") - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == uniqueTopicName else { - throw RDKafkaClientHolderError.generic("Received topic result for topic with different name") - } - - return uniqueTopicName - } - - /// Delete a topic. - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter topic: Topic to delete. - /// - Parameter timeout: Timeout in milliseconds. - /// - Throws: A ``KafkaError`` if the topic deletion failed. - func _deleteTopic(_ topic: String, timeout: Int32) throws { - let deleteTopic = rd_kafka_DeleteTopic_new(topic) - defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } - - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] - rd_kafka_DeleteTopics( - kafkaHandle, - &deleteTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw RDKafkaClientHolderError.generic("No DeleteTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw RDKafkaClientHolderError.generic("code: \(resultCode)") - } - - guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { - throw RDKafkaClientHolderError.generic("Received event that is not of type rd_kafka_DeleteTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_DeleteTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw RDKafkaClientHolderError.generic("Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw RDKafkaClientHolderError.generic("code: \(topicResultError)") - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == topic else { - throw RDKafkaClientHolderError.generic("Received topic result for topic with different name") - } - } -} - -func sendAndAcknowledgeMessages( - producer: KafkaProducer, - events: KafkaProducerEvents, - messages: [KafkaProducerMessage] -) async throws { - for message in messages { - while true { // Note: this is an example of queue full - do { - try producer.send(message) - break - } catch let error as KafkaError where error.description.contains("Queue full") { - continue - } catch { - benchLog("Caught some error: \(error)") - throw error - } - } - } - - var receivedDeliveryReportsCtr = 0 - var prevPercent = 0 - - for await event in events { - switch event { - case .deliveryReports(let deliveryReports): - receivedDeliveryReportsCtr += deliveryReports.count - default: - break // Ignore any other events - } - let curPercent = receivedDeliveryReportsCtr * 100 / messages.count - if curPercent >= prevPercent + 10 { - benchLog("Delivered \(curPercent)% of messages") - prevPercent = curPercent - } - - if receivedDeliveryReportsCtr >= messages.count { - break - } - } -} +//enum RDKafkaClientHolderError: Error { +// case generic(String) +//} +// +///// ``RDKafkaClientHolder`` is a basic wrapper that automatically destroys kafka handle +//class RDKafkaClientHolder { +// let kafkaHandle: OpaquePointer +// +// enum HandleType { +// case producer +// case consumer +// } +// +// init(configDictionary: [String: String], type: HandleType) { +// let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) +// defer { errorChars.deallocate() } +// +// let config: OpaquePointer = rd_kafka_conf_new() +// configDictionary.forEach { key, value in +// let res = rd_kafka_conf_set( +// config, +// key, +// value, +// errorChars, +// stringSize +// ) +// if res != RD_KAFKA_CONF_OK { +// fatalError("Could not set \(key) with \(value)") +// } +// } +// +// guard let handle = rd_kafka_new( +// type == .consumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER, +// config, +// errorChars, +// stringSize +// ) else { +// fatalError("Could not create client") +// } +// self.kafkaHandle = handle +// } +// +// deinit { +// rd_kafka_poll(self.kafkaHandle, 0) +// rd_kafka_destroy(self.kafkaHandle) +// } +//} -func createTestMessages( - topic: String, - headers: [KafkaHeader] = [], - count: UInt -) -> [KafkaProducerMessage] { - return Array(0.. String? { ProcessInfo.processInfo.environment[key] diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index da9a8abe..9f4eeaa0 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -30,6 +30,7 @@ let package = Package( dependencies: [ .product(name: "Benchmark", package: "package-benchmark"), .product(name: "Kafka", package: "swift-kafka-client"), + .product(name: "KafkaTestUtils", package: "swift-kafka-client") ], path: "Benchmarks/SwiftKafkaBenchmarks", plugins: [ diff --git a/Package.swift b/Package.swift index 8d463327..4172d2c8 100644 --- a/Package.swift +++ b/Package.swift @@ -43,6 +43,10 @@ let package = Package( name: "KafkaFoundationCompat", targets: ["KafkaFoundationCompat"] ), + .library( + name: "KafkaTestUtils", + targets: ["KafkaTestUtils"] + ), ], dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"), @@ -85,6 +89,12 @@ let package = Package( .product(name: "Logging", package: "swift-log"), ] ), + .target( + name: "KafkaTestUtils", + dependencies: [ + "Kafka" + ] + ), .target( name: "KafkaFoundationCompat", dependencies: [ @@ -93,11 +103,11 @@ let package = Package( ), .testTarget( name: "KafkaTests", - dependencies: ["Kafka"] + dependencies: ["Kafka", "KafkaTestUtils"] ), .testTarget( name: "IntegrationTests", - dependencies: ["Kafka"] + dependencies: ["Kafka", "KafkaTestUtils"] ), ] ) diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index a8d7d26b..985cf61b 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -223,7 +223,7 @@ public struct KafkaConsumerConfiguration { // MARK: - KafkaConsumerConfiguration + Dictionary extension KafkaConsumerConfiguration { - internal var dictionary: [String: String] { + @_spi(Internal) public var dictionary: [String: String] { var resultDict: [String: String] = [:] switch self.consumptionStrategy._internal { diff --git a/Sources/Kafka/KafkaError.swift b/Sources/Kafka/KafkaError.swift index ddaf119a..e093b7c8 100644 --- a/Sources/Kafka/KafkaError.swift +++ b/Sources/Kafka/KafkaError.swift @@ -55,7 +55,8 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { } } - static func rdKafkaError( + @_spi(Internal) + public static func rdKafkaError( wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line ) -> KafkaError { let errorMessage = String(cString: rd_kafka_err2str(error)) @@ -116,7 +117,8 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { ) } - static func topicCreation( + @_spi(Internal) + public static func topicCreation( reason: String, file: String = #fileID, line: UInt = #line ) -> KafkaError { return KafkaError( @@ -126,7 +128,8 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable { ) } - static func topicDeletion( + @_spi(Internal) + public static func topicDeletion( reason: String, file: String = #fileID, line: UInt = #line ) -> KafkaError { return KafkaError( diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index da415f22..08c49929 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -18,12 +18,14 @@ import Logging /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, /// which is used to handle the connection to the Kafka ecosystem. -final class RDKafkaClient: Sendable { +@_spi(Internal) +final public class RDKafkaClient: Sendable { // Default size for Strings returned from C API static let stringSize = 1024 /// Determines if client is a producer or a consumer. - enum ClientType { + @_spi(Internal) + public enum ClientType { case producer case consumer } @@ -71,7 +73,8 @@ final class RDKafkaClient: Sendable { } /// Factory method creating a new instance of a ``RDKafkaClient``. - static func makeClient( + @_spi(Internal) + public static func makeClient( type: ClientType, configDictionary: [String: String], events: [RDKafkaEvent], @@ -646,7 +649,17 @@ final class RDKafkaClient: Sendable { /// - Warning: Do not escape the pointer from the closure for later use. /// - Parameter body: The closure will use the Kafka handle pointer. @discardableResult - func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { + @_spi(Internal) + public func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { return try body(self.kafkaHandle) } + + /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. + /// - Warning: Do not escape the pointer from the closure for later use. + /// - Parameter body: The closure will use the Kafka handle pointer. + @discardableResult + @_spi(Internal) + public func withKafkaHandlePointer(_ body: (OpaquePointer) async throws -> T) async rethrows -> T { + return try await body(self.kafkaHandle) + } } diff --git a/Sources/Kafka/RDKafka/RDKafkaEvent.swift b/Sources/Kafka/RDKafka/RDKafkaEvent.swift index ec40d85b..1696094b 100644 --- a/Sources/Kafka/RDKafka/RDKafkaEvent.swift +++ b/Sources/Kafka/RDKafka/RDKafkaEvent.swift @@ -16,7 +16,8 @@ import Crdkafka /// Swift `enum` wrapping `librdkafka`'s `RD_KAFKA_EVENT_*` types. /// See `RD_KAFKA_EVENT_*` in rdkafka.h for reference. -internal enum RDKafkaEvent: Int32 { +@_spi(Internal) +public enum RDKafkaEvent: Int32 { case none = 0x0 case deliveryReport = 0x1 case fetch = 0x2 diff --git a/Sources/KafkaTestUtils/KafkaTestLogger.swift b/Sources/KafkaTestUtils/KafkaTestLogger.swift new file mode 100644 index 00000000..32f2a4e2 --- /dev/null +++ b/Sources/KafkaTestUtils/KafkaTestLogger.swift @@ -0,0 +1,23 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging + +extension Logger { + public static var kafkaTest: Logger { + var logger = Logger(label: "kafka.test") + logger.logLevel = .info + return logger + } +} diff --git a/Sources/KafkaTestUtils/KafkaTestMessages.swift b/Sources/KafkaTestUtils/KafkaTestMessages.swift new file mode 100644 index 00000000..4dbdfa06 --- /dev/null +++ b/Sources/KafkaTestUtils/KafkaTestMessages.swift @@ -0,0 +1,62 @@ +import Kafka +import struct Foundation.Date +import Logging + +public struct KafkaTestMessages { + public static func sendAndAcknowledge( + producer: KafkaProducer, + events: KafkaProducerEvents, + messages: [KafkaProducerMessage], + logger: Logger = .kafkaTest + ) async throws { + for message in messages { + while true { // Note: this is an example of queue full + do { + try producer.send(message) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + continue + } catch { + logger.error("Caught some error: \(error)") + throw error + } + } + } + + var receivedDeliveryReportsCtr = 0 + var prevPercent = 0 + + for await event in events { + switch event { + case .deliveryReports(let deliveryReports): + receivedDeliveryReportsCtr += deliveryReports.count + default: + break // Ignore any other events + } + let curPercent = receivedDeliveryReportsCtr * 100 / messages.count + if curPercent >= prevPercent + 10 { + logger.debug("Delivered \(curPercent)% of messages") + prevPercent = curPercent + } + + if receivedDeliveryReportsCtr >= messages.count { + break + } + } + } + + public static func create( + topic: String, + headers: [KafkaHeader] = [], + count: UInt + ) -> [KafkaProducerMessage] { + return Array(0.. String { + let uniqueTopicName = UUID().uuidString + + let errorChars = UnsafeMutablePointer.allocate(capacity: TestRDKafkaClient.stringSize) + defer { errorChars.deallocate() } + + guard let newTopic = rd_kafka_NewTopic_new( + uniqueTopicName, + partitions ?? -1, // use default num_partitions + -1, // use default replication_factor + errorChars, + TestRDKafkaClient.stringSize + ) else { + let errorString = String(cString: errorChars) + throw KafkaError.topicCreation(reason: errorString) + } + defer { rd_kafka_NewTopic_destroy(newTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var newTopicsArray: [OpaquePointer?] = [newTopic] + rd_kafka_CreateTopics( + kafkaHandle, + &newTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { + throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_CreateTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicCreation(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == uniqueTopicName else { + throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") + } + } + + return uniqueTopicName + } + + /// Delete a topic. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter topic: Topic to delete. + /// - Parameter timeout: Timeout in milliseconds. + /// - Throws: A ``KafkaError`` if the topic deletion failed. + func _deleteTopic(_ topic: String, timeout: Int32) throws { + let deleteTopic = rd_kafka_DeleteTopic_new(topic) + defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] + rd_kafka_DeleteTopics( + kafkaHandle, + &deleteTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { + throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_DeleteTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == topic else { + throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") + } + } + } +} diff --git a/Sources/KafkaTestUtils/TestRDKafkaClient.swift b/Sources/KafkaTestUtils/TestRDKafkaClient.swift new file mode 100644 index 00000000..2e5f17f5 --- /dev/null +++ b/Sources/KafkaTestUtils/TestRDKafkaClient.swift @@ -0,0 +1,47 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +@_spi(Internal) import Kafka +import struct Foundation.UUID + +public struct TestRDKafkaClient { + public static let stringSize = 1024 + + let client: Kafka.RDKafkaClient + + /// creates librdkafka dictionary config + public static func _createDummyConfig(bootstrapAddresses: KafkaConfiguration.BrokerAddress, addressFamily: KafkaConfiguration.IPAddressFamily = .any) -> [String: String] { + var config = KafkaConsumerConfiguration(consumptionStrategy: .group(id: "[no id]", topics: []), bootstrapBrokerAddresses: [bootstrapAddresses]) + config.broker.addressFamily = addressFamily + return config.dictionary + } + + /// creates RDKafkaClient with dictionary config + public static func _makeRDKafkaClient(config: [String: String], logger: Logger? = nil) throws -> TestRDKafkaClient { + let rdKafkaClient = try Kafka.RDKafkaClient.makeClient(type: .consumer, configDictionary: config, events: [], logger: logger ?? .kafkaTest) + return TestRDKafkaClient(client: rdKafkaClient) + } + + + @discardableResult + public func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { + try self.client.withKafkaHandlePointer(body) + } + + @discardableResult + public func withKafkaHandlePointer(_ body: (OpaquePointer) async throws -> T) async rethrows -> T { + try await self.client.withKafkaHandlePointer(body) + } +} diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index e6cf82e5..94e01191 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -13,7 +13,8 @@ //===----------------------------------------------------------------------===// import struct Foundation.UUID -@testable import Kafka +import Kafka +import KafkaTestUtils import NIOCore import ServiceLifecycle import XCTest @@ -49,36 +50,18 @@ final class KafkaTests: XCTestCase { self.producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]) self.producerConfig.broker.addressFamily = .v4 - var basicConfig = KafkaConsumerConfiguration( - consumptionStrategy: .group(id: "no-group", topics: []), - bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] - ) - basicConfig.broker.addressFamily = .v4 + let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress, addressFamily: .v4) // TODO: ok to block here? How to make setup async? - let client = try RDKafkaClient.makeClient( - type: .consumer, - configDictionary: basicConfig.dictionary, - events: [], - logger: .kafkaTest - ) + let client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) self.uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) } override func tearDownWithError() throws { - var basicConfig = KafkaConsumerConfiguration( - consumptionStrategy: .group(id: "no-group", topics: []), - bootstrapBrokerAddresses: [self.bootstrapBrokerAddress] - ) - basicConfig.broker.addressFamily = .v4 + let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress, addressFamily: .v4) // TODO: ok to block here? Problem: Tests may finish before topic is deleted - let client = try RDKafkaClient.makeClient( - type: .consumer, - configDictionary: basicConfig.dictionary, - events: [], - logger: .kafkaTest - ) + let client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000) self.bootstrapBrokerAddress = nil @@ -87,7 +70,7 @@ final class KafkaTests: XCTestCase { } func testProduceAndConsumeWithConsumerGroup() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) var consumerConfig = KafkaConsumerConfiguration( @@ -113,7 +96,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -149,7 +132,7 @@ final class KafkaTests: XCTestCase { } func testProduceAndConsumeWithAssignedTopicPartition() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) var consumerConfig = KafkaConsumerConfiguration( @@ -179,7 +162,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -215,7 +198,7 @@ final class KafkaTests: XCTestCase { } func testProduceAndConsumeWithScheduleCommit() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) var consumerConfig = KafkaConsumerConfiguration( @@ -242,7 +225,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -273,7 +256,7 @@ final class KafkaTests: XCTestCase { } func testProduceAndConsumeWithCommit() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 10) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) var consumerConfig = KafkaConsumerConfiguration( @@ -300,7 +283,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -331,7 +314,7 @@ final class KafkaTests: XCTestCase { } func testProduceAndConsumeWithMessageHeaders() async throws { - let testMessages = Self.createTestMessages( + let testMessages = KafkaTestMessages.create( topic: self.uniqueTestTopic, headers: [ KafkaHeader(key: "some.header", value: ByteBuffer(string: "some-header-value")), @@ -366,7 +349,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -404,7 +387,7 @@ final class KafkaTests: XCTestCase { } func testNoNewConsumerMessagesAfterGracefulShutdown() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 2) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 2) let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) let uniqueGroupID = UUID().uuidString @@ -434,7 +417,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: events, messages: testMessages @@ -467,7 +450,7 @@ final class KafkaTests: XCTestCase { } func testCommittedOffsetsAreCorrect() async throws { - let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10) + let testMessages = KafkaTestMessages.create(topic: self.uniqueTestTopic, count: 10) let firstConsumerOffset = testMessages.count / 2 let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest) @@ -502,7 +485,7 @@ final class KafkaTests: XCTestCase { // Producer Task group.addTask { - try await Self.sendAndAcknowledgeMessages( + try await KafkaTestMessages.sendAndAcknowledge( producer: producer, events: acks, messages: testMessages @@ -598,66 +581,4 @@ final class KafkaTests: XCTestCase { await serviceGroup2.triggerGracefulShutdown() } } - - // MARK: - Helpers - - private static func createTestMessages( - topic: String, - headers: [KafkaHeader] = [], - count: UInt - ) -> [KafkaProducerMessage] { - return Array(0..] - ) async throws { - var messageIDs = Set() - - for message in messages { - messageIDs.insert(try producer.send(message)) - } - - var receivedDeliveryReports = Set() - - for await event in events { - switch event { - case .deliveryReports(let deliveryReports): - for deliveryReport in deliveryReports { - receivedDeliveryReports.insert(deliveryReport) - } - default: - break // Ignore any other events - } - - if receivedDeliveryReports.count >= messages.count { - break - } - } - - XCTAssertEqual(Set(receivedDeliveryReports.map(\.id)), messageIDs) - - let acknowledgedMessages: [KafkaAcknowledgedMessage] = receivedDeliveryReports.compactMap { - guard case .acknowledged(let receivedMessage) = $0.status else { - return nil - } - return receivedMessage - } - - XCTAssertEqual(messages.count, acknowledgedMessages.count) - for message in messages { - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message.topic })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) })) - } - } } diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift index db86c0a0..8b137891 100644 --- a/Tests/IntegrationTests/Utilities.swift +++ b/Tests/IntegrationTests/Utilities.swift @@ -1,182 +1 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-client open source project -// -// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// -import Crdkafka -import struct Foundation.UUID -@testable import Kafka -import Logging - -extension Logger { - static var kafkaTest: Logger { - var logger = Logger(label: "kafka.test") - logger.logLevel = .info - return logger - } -} - -extension RDKafkaClient { -// func createUniqueTopic(timeout: Int32 = 10000) async throws -> String { -// try await withCheckedThrowingContinuation { continuation in -// do { -// let uniqueTopic = try self._createUniqueTopic(timeout: timeout) -// continuation.resume(returning: uniqueTopic) -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Create a topic with a unique name (`UUID`). - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter timeout: Timeout in milliseconds. - /// - Returns: Name of newly created topic. - /// - Throws: A ``KafkaError`` if the topic creation failed. - func _createUniqueTopic(timeout: Int32) throws -> String { - let uniqueTopicName = UUID().uuidString - - let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) - defer { errorChars.deallocate() } - - guard let newTopic = rd_kafka_NewTopic_new( - uniqueTopicName, - -1, // use default num_partitions - -1, // use default replication_factor - errorChars, - RDKafkaClient.stringSize - ) else { - let errorString = String(cString: errorChars) - throw KafkaError.topicCreation(reason: errorString) - } - defer { rd_kafka_NewTopic_destroy(newTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var newTopicsArray: [OpaquePointer?] = [newTopic] - rd_kafka_CreateTopics( - kafkaHandle, - &newTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { - throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_CreateTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicCreation(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == uniqueTopicName else { - throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") - } - } - - return uniqueTopicName - } - -// func deleteTopic(_ topic: String, timeout: Int32 = 10000) async throws { -// try await withCheckedThrowingContinuation { continuation in -// do { -// try self._deleteTopic(topic, timeout: timeout) -// continuation.resume() -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Delete a topic. - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter topic: Topic to delete. - /// - Parameter timeout: Timeout in milliseconds. - /// - Throws: A ``KafkaError`` if the topic deletion failed. - func _deleteTopic(_ topic: String, timeout: Int32) throws { - let deleteTopic = rd_kafka_DeleteTopic_new(topic) - defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] - rd_kafka_DeleteTopics( - kafkaHandle, - &deleteTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { - throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_DeleteTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == topic else { - throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") - } - } - } -} diff --git a/Tests/KafkaTests/KafkaConsumerTests.swift b/Tests/KafkaTests/KafkaConsumerTests.swift index 212853c6..480a8387 100644 --- a/Tests/KafkaTests/KafkaConsumerTests.swift +++ b/Tests/KafkaTests/KafkaConsumerTests.swift @@ -14,6 +14,7 @@ import struct Foundation.UUID @testable import Kafka +import KafkaTestUtils import Logging import ServiceLifecycle import XCTest diff --git a/Tests/KafkaTests/KafkaProducerTests.swift b/Tests/KafkaTests/KafkaProducerTests.swift index f4124898..27b2f52f 100644 --- a/Tests/KafkaTests/KafkaProducerTests.swift +++ b/Tests/KafkaTests/KafkaProducerTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// @testable import Kafka +import KafkaTestUtils import Logging import NIOCore import ServiceLifecycle diff --git a/Tests/KafkaTests/Utilities.swift b/Tests/KafkaTests/Utilities.swift index f7fbfbf8..bdba4fd8 100644 --- a/Tests/KafkaTests/Utilities.swift +++ b/Tests/KafkaTests/Utilities.swift @@ -15,14 +15,6 @@ import Logging import NIOConcurrencyHelpers -extension Logger { - static var kafkaTest: Logger { - var logger = Logger(label: "kafka.test") - logger.logLevel = .info - return logger - } -} - // MARK: - Mocks internal struct LogEvent { From 684f1040c1ee19e2c8fa0f002b966b69485b2610 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Fri, 20 Oct 2023 11:34:15 +0300 Subject: [PATCH 07/12] revert testable --- Tests/IntegrationTests/KafkaTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index 94e01191..35ecceed 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import struct Foundation.UUID -import Kafka +@testable import Kafka import KafkaTestUtils import NIOCore import ServiceLifecycle From 97f48a724ec3685829a11c40f3a48c17793d6876 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Fri, 20 Oct 2023 12:50:17 +0300 Subject: [PATCH 08/12] reduce metrics --- .../SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json | 10 +++------- .../5.10/SwiftKafkaBenchmarks.librdkafka.p90.json | 10 +++------- .../SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json | 10 +++------- .../5.7/SwiftKafkaBenchmarks.librdkafka.p90.json | 10 +++------- .../SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json | 10 +++------- .../5.8/SwiftKafkaBenchmarks.librdkafka.p90.json | 10 +++------- .../SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json | 10 +++------- .../5.9/SwiftKafkaBenchmarks.librdkafka.p90.json | 10 +++------- .../SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json | 10 +++------- .../main/SwiftKafkaBenchmarks.librdkafka.p90.json | 10 +++------- 10 files changed, 30 insertions(+), 70 deletions(-) diff --git a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json index 2aee64d1..da6eb181 100644 --- a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json +++ b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 241631232, - "cpuTotal" : 8350000000, - "objectAllocCount" : 2021543, - "releaseCount" : 14078309, - "retainCount" : 11040340, - "retainReleaseDelta" : 1016426, - "wallClock" : 10306245297 + "allocatedResidentMemory" : 156172288, + "cpuTotal" : 8130000000, + "wallClock" : 9970203463 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json index 286fec8f..f71e86f1 100644 --- a/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json +++ b/Benchmarks/Thresholds/5.10/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 363003904, - "cpuTotal" : 160000000, - "objectAllocCount" : 210, - "releaseCount" : 315, - "retainCount" : 0, - "retainReleaseDelta" : 105, - "wallClock" : 195243208 + "allocatedResidentMemory" : 343998464, + "cpuTotal" : 140000000, + "wallClock" : 181559208 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json index 4e9fd75c..733f1f1f 100644 --- a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json +++ b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 162463744, - "cpuTotal" : 8580000000, - "objectAllocCount" : 2023297, - "releaseCount" : 16641710, - "retainCount" : 13600233, - "retainReleaseDelta" : 1018180, - "wallClock" : 10797182506 + "allocatedResidentMemory" : 132448256, + "cpuTotal" : 10420000000, + "wallClock" : 11489118547 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json index e0428dfe..59cb6b55 100644 --- a/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json +++ b/Benchmarks/Thresholds/5.7/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 366739456, - "cpuTotal" : 150000000, - "objectAllocCount" : 210, - "releaseCount" : 315, - "retainCount" : 0, - "retainReleaseDelta" : 105, - "wallClock" : 185274416 + "allocatedResidentMemory" : 346685440, + "cpuTotal" : 130000000, + "wallClock" : 188535375 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json index 625f41f0..c146dfc4 100644 --- a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json +++ b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 159383552, - "cpuTotal" : 6170000000, - "objectAllocCount" : 2020626, - "releaseCount" : 13604331, - "retainCount" : 10568194, - "retainReleaseDelta" : 1015511, - "wallClock" : 9365130088 + "allocatedResidentMemory" : 193331200, + "cpuTotal" : 7840000000, + "wallClock" : 10121511254 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json index feaafba3..04d86e84 100644 --- a/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json +++ b/Benchmarks/Thresholds/5.8/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 365428736, - "cpuTotal" : 120000000, - "objectAllocCount" : 208, - "releaseCount" : 312, - "retainCount" : 0, - "retainReleaseDelta" : 104, - "wallClock" : 170432000 + "allocatedResidentMemory" : 362676224, + "cpuTotal" : 130000000, + "wallClock" : 191337209 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json index 817fec4b..a28c8677 100644 --- a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json +++ b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 178651136, - "cpuTotal" : 6750000000, - "objectAllocCount" : 2020774, - "releaseCount" : 13589597, - "retainCount" : 10553164, - "retainReleaseDelta" : 1015659, - "wallClock" : 9534342338 + "allocatedResidentMemory" : 139853824, + "cpuTotal" : 9620000000, + "wallClock" : 10959377547 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json index d60d2d8d..ab00e2fe 100644 --- a/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json +++ b/Benchmarks/Thresholds/5.9/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 372375552, - "cpuTotal" : 120000000, - "objectAllocCount" : 216, - "releaseCount" : 324, - "retainCount" : 0, - "retainReleaseDelta" : 108, - "wallClock" : 176588708 + "allocatedResidentMemory" : 346619904, + "cpuTotal" : 130000000, + "wallClock" : 179947833 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json index 1c9e7f02..334bf9ee 100644 --- a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json +++ b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.SwiftKafkaConsumer.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 161611776, - "cpuTotal" : 7800000000, - "objectAllocCount" : 2021025, - "releaseCount" : 14075173, - "retainCount" : 11038244, - "retainReleaseDelta" : 1015904, - "wallClock" : 10082730324 + "allocatedResidentMemory" : 189071360, + "cpuTotal" : 8050000000, + "wallClock" : 10166945241 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json index 5f089ba6..d8545e54 100644 --- a/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json +++ b/Benchmarks/Thresholds/main/SwiftKafkaBenchmarks.librdkafka.p90.json @@ -1,9 +1,5 @@ { - "allocatedResidentMemory" : 361431040, - "cpuTotal" : 120000000, - "objectAllocCount" : 218, - "releaseCount" : 327, - "retainCount" : 0, - "retainReleaseDelta" : 109, - "wallClock" : 177079834 + "allocatedResidentMemory" : 346226688, + "cpuTotal" : 160000000, + "wallClock" : 193623875 } \ No newline at end of file From 0dbc3b898188d43402884bda8f629c800f25cc0e Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Thu, 26 Oct 2023 20:17:40 +0300 Subject: [PATCH 09/12] add benchmark utils --- .../SwiftKafkaBenchmarkUtils/Utilities.swift | 61 ++++++++++ .../SwiftKafkaBenchmarks/Utilities.swift | 111 ------------------ .../KafkaConsumerBenchmark.swift | 1 + Benchmarks/Package.swift | 14 ++- 4 files changed, 74 insertions(+), 113 deletions(-) create mode 100644 Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift delete mode 100644 Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift rename Benchmarks/Benchmarks/{SwiftKafkaBenchmarks => SwiftKafkaConsumerBenchmarks}/KafkaConsumerBenchmark.swift (99%) diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift new file mode 100644 index 00000000..39f482f9 --- /dev/null +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift @@ -0,0 +1,61 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import Foundation +import Kafka +import KafkaTestUtils +import Logging + +public extension Logger { + static var testLogger: Logger { + var logger = Logger(label: "bench_log") + #if DEBUG + logger.logLevel = .debug + #else + logger.logLevel = .info + #endif + return logger + } +} + +public let logger: Logger = .testLogger +public let kafkaHost: String = kafkaHostFromEnv() +public let kafkaPort: Int = kafkaPortFromEnv() + +public func benchLog(_ msg: @autoclosure () -> Logger.Message) { +// Just in case for debug + #if DEBUG + logger.debug(msg()) + #endif +} + +public func getFromEnv(_ key: String) -> String? { + ProcessInfo.processInfo.environment[key] +} + +public func kafkaHostFromEnv() -> String { + getFromEnv("KAFKA_HOST") ?? "localhost" +} + +public func kafkaPortFromEnv() -> Int { + .init(getFromEnv("KAFKA_PORT") ?? "9092")! +} + +public func bootstrapBrokerAddress() -> KafkaConfiguration.BrokerAddress { + .init( + host: kafkaHost, + port: kafkaPort + ) +} diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift deleted file mode 100644 index 92367276..00000000 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/Utilities.swift +++ /dev/null @@ -1,111 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the swift-kafka-client open source project -// -// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Crdkafka -import Foundation -import Kafka -import KafkaTestUtils -import Logging - -extension Logger { - static var testLogger: Logger { - var logger = Logger(label: "bench_log") - #if DEBUG - logger.logLevel = .debug - #else - logger.logLevel = .info - #endif - return logger - } -} - -let logger: Logger = .testLogger -let stringSize = 1024 -let kafkaHost: String = kafkaHostFromEnv() -let kafkaPort: Int = kafkaPortFromEnv() - -func benchLog(_ msg: @autoclosure () -> Logger.Message) { -// Just in case for debug - #if DEBUG - logger.debug(msg()) - #endif -} - -//enum RDKafkaClientHolderError: Error { -// case generic(String) -//} -// -///// ``RDKafkaClientHolder`` is a basic wrapper that automatically destroys kafka handle -//class RDKafkaClientHolder { -// let kafkaHandle: OpaquePointer -// -// enum HandleType { -// case producer -// case consumer -// } -// -// init(configDictionary: [String: String], type: HandleType) { -// let errorChars = UnsafeMutablePointer.allocate(capacity: stringSize) -// defer { errorChars.deallocate() } -// -// let config: OpaquePointer = rd_kafka_conf_new() -// configDictionary.forEach { key, value in -// let res = rd_kafka_conf_set( -// config, -// key, -// value, -// errorChars, -// stringSize -// ) -// if res != RD_KAFKA_CONF_OK { -// fatalError("Could not set \(key) with \(value)") -// } -// } -// -// guard let handle = rd_kafka_new( -// type == .consumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER, -// config, -// errorChars, -// stringSize -// ) else { -// fatalError("Could not create client") -// } -// self.kafkaHandle = handle -// } -// -// deinit { -// rd_kafka_poll(self.kafkaHandle, 0) -// rd_kafka_destroy(self.kafkaHandle) -// } -//} - - -func getFromEnv(_ key: String) -> String? { - ProcessInfo.processInfo.environment[key] -} - -func kafkaHostFromEnv() -> String { - getFromEnv("KAFKA_HOST") ?? "localhost" -} - -func kafkaPortFromEnv() -> Int { - .init(getFromEnv("KAFKA_PORT") ?? "9092")! -} - -func bootstrapBrokerAddress() -> KafkaConfiguration.BrokerAddress { - .init( - host: kafkaHost, - port: kafkaPort - ) -} diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift similarity index 99% rename from Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift rename to Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index 16a88cac..8877f888 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -20,6 +20,7 @@ import NIOCore import ServiceLifecycle import Logging import Benchmark +import SwiftKafkaBenchmarkUtils private let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "500000")! diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 9f4eeaa0..6200e3b4 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -25,14 +25,24 @@ let package = Package( .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.11.1"), ], targets: [ + .target( + name: "SwiftKafkaBenchmarkUtils", + dependencies: [ + .product(name: "Benchmark", package: "package-benchmark"), + .product(name: "Kafka", package: "swift-kafka-client"), + .product(name: "KafkaTestUtils", package: "swift-kafka-client") + ], + path: "Benchmarks/SwiftKafkaBenchmarkUtils" + ), .executableTarget( - name: "SwiftKafkaBenchmarks", + name: "SwiftKafkaConsumerBenchmarks", dependencies: [ + "SwiftKafkaBenchmarkUtils", .product(name: "Benchmark", package: "package-benchmark"), .product(name: "Kafka", package: "swift-kafka-client"), .product(name: "KafkaTestUtils", package: "swift-kafka-client") ], - path: "Benchmarks/SwiftKafkaBenchmarks", + path: "Benchmarks/SwiftKafkaConsumerBenchmarks", plugins: [ .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] From d0f077deb02fa5eb6fc0d138c964656dae6439de Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Fri, 27 Oct 2023 13:04:37 +0300 Subject: [PATCH 10/12] add producer and headers benchmarks, fix returned error from produce --- .../SwiftKafkaBenchmarkUtils/Utilities.swift | 2 + .../KafkaConsumerBenchmark.swift | 152 +++++++--- .../KafkaProducerBenchmark.swift | 272 ++++++++++++++++++ Benchmarks/Package.swift | 13 + .../KafkaProducerConfiguration.swift | 3 +- Sources/Kafka/RDKafka/RDKafkaClient.swift | 2 +- .../KafkaTestUtils/KafkaTestMessages.swift | 68 +++-- .../KafkaTestUtils/TestRDKafkaClient.swift | 13 +- 8 files changed, 457 insertions(+), 68 deletions(-) create mode 100644 Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift diff --git a/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift index 39f482f9..70b87910 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaBenchmarkUtils/Utilities.swift @@ -33,6 +33,8 @@ public extension Logger { public let logger: Logger = .testLogger public let kafkaHost: String = kafkaHostFromEnv() public let kafkaPort: Int = kafkaPortFromEnv() +public let numOfMessages: UInt = .init(getFromEnv("MESSAGES_NUMBER") ?? "500000")! + public func benchLog(_ msg: @autoclosure () -> Logger.Message) { // Just in case for debug diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index 8877f888..ac1f9269 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -28,6 +28,61 @@ private var uniqueTestTopic: String! private var client: TestRDKafkaClient! private var testMessages: [KafkaProducerMessage]! +private func prepareTopic(withHeaders: Bool = false) async throws { + let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress()) + client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) + + uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) + benchLog("Created topic \(uniqueTestTopic!)") + + benchLog("Generating \(numOfMessages) messages") + var headers: [KafkaHeader]? + if withHeaders { + headers = Array(0..<10).map { idx in + "\(idx.hashValue)".withUnsafeBytes { value in + .init(key: "\(idx)", value: ByteBuffer(bytes: value)) + } + } + } + testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: headers ?? [], count: numOfMessages) + benchLog("Finish generating \(numOfMessages) messages") + + var producerConfig: KafkaProducerConfiguration! + + producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(numOfMessages) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup1.run() + } + + // Producer Task + group.addTask { + try await KafkaTestMessages.sendAndAcknowledge( + producer: producer, + events: acks, + messages: testMessages + ) + } + + // Wait for Producer Task to complete + try await group.next() + await serviceGroup1.triggerGracefulShutdown() + } +} + let benchmarks = { Benchmark.defaultConfiguration = .init( metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory], @@ -38,58 +93,89 @@ let benchmarks = { ) Benchmark.setup = { - let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress()) - client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) - uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) - benchLog("Created topic \(uniqueTestTopic!)") - - benchLog("Generating \(numOfMessages) messages") - testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, count: numOfMessages) - benchLog("Finish generating \(numOfMessages) messages") + } + + Benchmark.teardown = { + try? client._deleteTopic(uniqueTestTopic, timeout: -1) + client = nil + } + + Benchmark("SwiftKafkaConsumer") { benchmark in + try await prepareTopic() - var producerConfig: KafkaProducerConfiguration! + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress()] + ) + consumerConfig.pollInterval = .milliseconds(1) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + consumerConfig.pollInterval = .milliseconds(1) - producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) - producerConfig.broker.addressFamily = .v4 - - let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: logger + ) + let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2) - let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) - let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + benchmark.startMeasurement() try await withThrowingTaskGroup(of: Void.self) { group in - benchLog("Start producing \(numOfMessages) messages") + benchLog("Start consuming") defer { - benchLog("Finish producing") + benchLog("Finish consuming") } // Run Task group.addTask { - try await serviceGroup1.run() + try await serviceGroup2.run() } - // Producer Task + // Second Consumer Task group.addTask { - try await KafkaTestMessages.sendAndAcknowledge( - producer: producer, - events: acks, - messages: testMessages - ) + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + for try await record in consumer.messages { + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") + tmpCtr = 0 + } + if ctr >= numOfMessages { + break + } + } + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") } - // Wait for Producer Task to complete + // Wait for second Consumer Task to complete try await group.next() - await serviceGroup1.triggerGracefulShutdown() + // Shutdown the serviceGroup + await serviceGroup2.triggerGracefulShutdown() } + + benchmark.stopMeasurement() } - Benchmark.teardown = { - try? client._deleteTopic(uniqueTestTopic, timeout: -1) - client = nil - } - Benchmark("SwiftKafkaConsumer") { benchmark in + Benchmark("SwiftKafkaConsumer with headers") { benchmark in + try await prepareTopic(withHeaders: true) + let uniqueGroupID = UUID().uuidString var consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group( @@ -158,7 +244,9 @@ let benchmarks = { benchmark.stopMeasurement() } - Benchmark("librdkafka") { benchmark in + Benchmark("librdkafka consumer") { benchmark in + try await prepareTopic() + let uniqueGroupID = UUID().uuidString let rdKafkaConsumerConfig: [String: String] = [ "group.id": uniqueGroupID, diff --git a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift new file mode 100644 index 00000000..40cfdc85 --- /dev/null +++ b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift @@ -0,0 +1,272 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import Kafka +import KafkaTestUtils +import Foundation +import NIOCore +import ServiceLifecycle +import Logging +import Benchmark +import SwiftKafkaBenchmarkUtils + +private var uniqueTestTopic: String! +private var client: TestRDKafkaClient! + +let benchmarks = { + Benchmark.defaultConfiguration = .init( + metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory], + warmupIterations: 0, + scalingFactor: .one, + maxDuration: .seconds(5), + maxIterations: 1 + ) + + Benchmark.setup = { + let basicConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress()) + client = try TestRDKafkaClient._makeRDKafkaClient(config: basicConfig) + + uniqueTestTopic = try client._createUniqueTopic(timeout: 10 * 1000) + benchLog("Created topic \(uniqueTestTopic!)") + } + + Benchmark.teardown = { + try? client._deleteTopic(uniqueTestTopic, timeout: -1) + client = nil + } + + Benchmark("SwiftKafkaProducer") { benchmark in + let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, count: numOfMessages) + + var producerConfig: KafkaProducerConfiguration! + + producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + benchmark.startMeasurement() + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(numOfMessages) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup1.run() + } + + group.addTask { + var receivedDeliveryReportsCtr = 0 + var prevPercent = 0 + + for await event in acks { + switch event { + case .deliveryReports(let deliveryReports): + receivedDeliveryReportsCtr += deliveryReports.count + default: + break // Ignore any other events + } + let curPercent = receivedDeliveryReportsCtr * 100 / testMessages.count + if curPercent >= prevPercent + 10 { + benchLog("Delivered \(curPercent)% of messages") + prevPercent = curPercent + } + + if receivedDeliveryReportsCtr >= testMessages.count { + break + } + } + } + + // Producer Task + group.addTask { + for message in testMessages { + while true { // Note: this is an example of queue full + do { + try producer.send(message) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + try await Task.sleep(for: .milliseconds(10)) + continue + } catch { + benchLog("Caught some error: \(error)") + throw error + } + } + } + } + + // Wait for Producer Task to complete + try await group.next() + try await group.next() + await serviceGroup1.triggerGracefulShutdown() + } + benchmark.stopMeasurement() + } + + Benchmark("SwiftKafkaProducer with headers") { benchmark in + let headers: [KafkaHeader] = Array(0..<10).map { idx in + "\(idx.hashValue)".withUnsafeBytes { value in + .init(key: "\(idx)", value: ByteBuffer(bytes: value)) + } + } + let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: headers, count: numOfMessages) + var producerConfig: KafkaProducerConfiguration! + + producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + benchmark.startMeasurement() + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(numOfMessages) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup1.run() + } + + group.addTask { + var receivedDeliveryReportsCtr = 0 + var prevPercent = 0 + + for await event in acks { + switch event { + case .deliveryReports(let deliveryReports): + receivedDeliveryReportsCtr += deliveryReports.count + default: + break // Ignore any other events + } + let curPercent = receivedDeliveryReportsCtr * 100 / testMessages.count + if curPercent >= prevPercent + 10 { + benchLog("Delivered \(curPercent)% of messages") + prevPercent = curPercent + } + + if receivedDeliveryReportsCtr >= testMessages.count { + break + } + } + } + + // Producer Task + group.addTask { + for message in testMessages { + while true { // Note: this is an example of queue full + do { + try producer.send(message) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + try await Task.sleep(for: .milliseconds(10)) + continue + } catch { + benchLog("Caught some error: \(error)") + throw error + } + } + } + } + + // Wait for Producer Task to complete + try await group.next() + try await group.next() + await serviceGroup1.triggerGracefulShutdown() + } + benchmark.stopMeasurement() + } + + + Benchmark("librdkafka producer") { benchmark in + let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, count: numOfMessages) + var producerConfig: KafkaProducerConfiguration! + + let uniqueGroupID = UUID().uuidString + let rdKafkaProducerConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress(), consumer: false) + + let producer = try TestRDKafkaClient._makeRDKafkaClient(config: rdKafkaProducerConfig, consumer: false) + try await producer.withKafkaHandlePointer { kafkaHandle in + let queue = rd_kafka_queue_get_main(kafkaHandle) + defer { rd_kafka_queue_destroy(queue) } + + let topicHandle = rd_kafka_topic_new( + kafkaHandle, + uniqueTestTopic, + nil + ) + defer { rd_kafka_topic_destroy(topicHandle) } + benchmark.startMeasurement() + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var messagesSent = 0 + while messagesSent < numOfMessages { + let event = rd_kafka_queue_poll(kafkaHandle, 0) + defer { rd_kafka_event_destroy(event) } + + guard let event else { + try await Task.sleep(for: .milliseconds(10)) + continue + } + + let rdEventType = rd_kafka_event_type(event) + if rdEventType == RD_KAFKA_EVENT_DR { + messagesSent += rd_kafka_event_message_count(event) + } + } + } + group.addTask { + var newMessageID = 0 + for message in testMessages { + while true { + let result = message.value.withUnsafeBytes { valueBuffer in + message.key!.withUnsafeBytes { keyBuffer in + rd_kafka_produce( + topicHandle, + Int32(message.partition.rawValue), + RD_KAFKA_MSG_F_COPY, + UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress), + valueBuffer.count, + keyBuffer.baseAddress, + keyBuffer.count, + nil + ) + } + } + if rd_kafka_resp_err_t(result) != RD_KAFKA_RESP_ERR_NO_ERROR { + rd_kafka_flush(kafkaHandle, 10) + continue + } + break + } + newMessageID += 1 + } + } + try await group.waitForAll() + } + benchmark.stopMeasurement() + } + } +} diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 6200e3b4..c71d9a3f 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -47,5 +47,18 @@ let package = Package( .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] ), + .executableTarget( + name: "SwiftKafkaProducerBenchmarks", + dependencies: [ + "SwiftKafkaBenchmarkUtils", + .product(name: "Benchmark", package: "package-benchmark"), + .product(name: "Kafka", package: "swift-kafka-client"), + .product(name: "KafkaTestUtils", package: "swift-kafka-client") + ], + path: "Benchmarks/SwiftKafkaProducerBenchmarks", + plugins: [ + .plugin(name: "BenchmarkPlugin", package: "package-benchmark") + ] + ), ] ) diff --git a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift index a774905b..2c491da5 100644 --- a/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaProducerConfiguration.swift @@ -175,7 +175,8 @@ public struct KafkaProducerConfiguration { // MARK: - KafkaProducerConfiguration + Dictionary extension KafkaProducerConfiguration { - internal var dictionary: [String: String] { + @_spi(Internal) + public var dictionary: [String: String] { var resultDict: [String: String] = [:] resultDict["enable.idempotence"] = String(self.isIdempotenceEnabled) diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 08c49929..f1b5772f 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -166,7 +166,7 @@ final public class RDKafkaClient: Sendable { } if error != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + throw KafkaError.rdKafkaError(wrapping: error) } } diff --git a/Sources/KafkaTestUtils/KafkaTestMessages.swift b/Sources/KafkaTestUtils/KafkaTestMessages.swift index 4dbdfa06..31879efb 100644 --- a/Sources/KafkaTestUtils/KafkaTestMessages.swift +++ b/Sources/KafkaTestUtils/KafkaTestMessages.swift @@ -9,39 +9,47 @@ public struct KafkaTestMessages { messages: [KafkaProducerMessage], logger: Logger = .kafkaTest ) async throws { - for message in messages { - while true { // Note: this is an example of queue full - do { - try producer.send(message) - break - } catch let error as KafkaError where error.description.contains("Queue full") { - continue - } catch { - logger.error("Caught some error: \(error)") - throw error + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for message in messages { + while true { // Note: this is an example of queue full + do { + try producer.send(message) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + try await Task.sleep(for: .milliseconds(10)) + continue + } catch { + logger.error("Caught some error: \(error)") + throw error + } + } } } - } - - var receivedDeliveryReportsCtr = 0 - var prevPercent = 0 - - for await event in events { - switch event { - case .deliveryReports(let deliveryReports): - receivedDeliveryReportsCtr += deliveryReports.count - default: - break // Ignore any other events - } - let curPercent = receivedDeliveryReportsCtr * 100 / messages.count - if curPercent >= prevPercent + 10 { - logger.debug("Delivered \(curPercent)% of messages") - prevPercent = curPercent - } - - if receivedDeliveryReportsCtr >= messages.count { - break + + group.addTask { + var receivedDeliveryReportsCtr = 0 + var prevPercent = 0 + + for await event in events { + switch event { + case .deliveryReports(let deliveryReports): + receivedDeliveryReportsCtr += deliveryReports.count + default: + break // Ignore any other events + } + let curPercent = receivedDeliveryReportsCtr * 100 / messages.count + if curPercent >= prevPercent + 10 { + logger.debug("Delivered \(curPercent)% of messages") + prevPercent = curPercent + } + + if receivedDeliveryReportsCtr >= messages.count { + break + } + } } + try await group.waitForAll() } } diff --git a/Sources/KafkaTestUtils/TestRDKafkaClient.swift b/Sources/KafkaTestUtils/TestRDKafkaClient.swift index 2e5f17f5..556a81db 100644 --- a/Sources/KafkaTestUtils/TestRDKafkaClient.swift +++ b/Sources/KafkaTestUtils/TestRDKafkaClient.swift @@ -22,15 +22,20 @@ public struct TestRDKafkaClient { let client: Kafka.RDKafkaClient /// creates librdkafka dictionary config - public static func _createDummyConfig(bootstrapAddresses: KafkaConfiguration.BrokerAddress, addressFamily: KafkaConfiguration.IPAddressFamily = .any) -> [String: String] { - var config = KafkaConsumerConfiguration(consumptionStrategy: .group(id: "[no id]", topics: []), bootstrapBrokerAddresses: [bootstrapAddresses]) + public static func _createDummyConfig(bootstrapAddresses: KafkaConfiguration.BrokerAddress, addressFamily: KafkaConfiguration.IPAddressFamily = .any, consumer: Bool = true) -> [String: String] { + if consumer { + var config = KafkaConsumerConfiguration(consumptionStrategy: .group(id: "[no id]", topics: []), bootstrapBrokerAddresses: [bootstrapAddresses]) + config.broker.addressFamily = addressFamily + return config.dictionary + } + var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapAddresses]) config.broker.addressFamily = addressFamily return config.dictionary } /// creates RDKafkaClient with dictionary config - public static func _makeRDKafkaClient(config: [String: String], logger: Logger? = nil) throws -> TestRDKafkaClient { - let rdKafkaClient = try Kafka.RDKafkaClient.makeClient(type: .consumer, configDictionary: config, events: [], logger: logger ?? .kafkaTest) + public static func _makeRDKafkaClient(config: [String: String], logger: Logger? = nil, consumer: Bool = true) throws -> TestRDKafkaClient { + let rdKafkaClient = try Kafka.RDKafkaClient.makeClient(type: consumer ? .consumer : .producer, configDictionary: config, events: consumer ? [] : [.deliveryReport], logger: logger ?? .kafkaTest) return TestRDKafkaClient(client: rdKafkaClient) } From 790e081600a31e91f3a32f1dd77b497303fa1505 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Fri, 27 Oct 2023 16:01:02 +0300 Subject: [PATCH 11/12] add librdkafka producer with headers --- .../KafkaConsumerBenchmark.swift | 14 +- .../KafkaProducerBenchmark.swift | 128 ++++++++++++++++-- .../KafkaTestUtils/KafkaTestMessages.swift | 8 ++ 3 files changed, 133 insertions(+), 17 deletions(-) diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index ac1f9269..a6863b7c 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -36,15 +36,11 @@ private func prepareTopic(withHeaders: Bool = false) async throws { benchLog("Created topic \(uniqueTestTopic!)") benchLog("Generating \(numOfMessages) messages") - var headers: [KafkaHeader]? - if withHeaders { - headers = Array(0..<10).map { idx in - "\(idx.hashValue)".withUnsafeBytes { value in - .init(key: "\(idx)", value: ByteBuffer(bytes: value)) - } - } - } - testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: headers ?? [], count: numOfMessages) + let headers: [KafkaHeader] = + withHeaders + ? KafkaTestMessages.createHeaders() + : [] + testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: headers, count: numOfMessages) benchLog("Finish generating \(numOfMessages) messages") var producerConfig: KafkaProducerConfiguration! diff --git a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift index 40cfdc85..4dc66567 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift @@ -25,6 +25,24 @@ import SwiftKafkaBenchmarkUtils private var uniqueTestTopic: String! private var client: TestRDKafkaClient! +private func withHeaders(_ headers: [KafkaHeader], startHeaderIndex: Int? = nil, _ body: (UnsafePointer, UnsafeRawBufferPointer?) throws -> Void, finalizeClosure: () throws -> T) rethrows -> T { + var index = startHeaderIndex ?? 0 + guard index < headers.count else { + return try finalizeClosure() + } + return try headers[index].key.withCString { keyBuffer in + if let value = headers[index].value { + return try value.withUnsafeReadableBytes { valueBuffer in + try body(keyBuffer, valueBuffer) + return try withHeaders(headers, startHeaderIndex: index + 1, body, finalizeClosure: finalizeClosure) + } + } else { + try body(keyBuffer, nil) + return try withHeaders(headers, startHeaderIndex: index + 1, body, finalizeClosure: finalizeClosure) + } + } +} + let benchmarks = { Benchmark.defaultConfiguration = .init( metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory], @@ -122,12 +140,7 @@ let benchmarks = { } Benchmark("SwiftKafkaProducer with headers") { benchmark in - let headers: [KafkaHeader] = Array(0..<10).map { idx in - "\(idx.hashValue)".withUnsafeBytes { value in - .init(key: "\(idx)", value: ByteBuffer(bytes: value)) - } - } - let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: headers, count: numOfMessages) + let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: KafkaTestMessages.createHeaders(), count: numOfMessages) var producerConfig: KafkaProducerConfiguration! producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [bootstrapBrokerAddress()]) @@ -238,7 +251,6 @@ let benchmarks = { } } group.addTask { - var newMessageID = 0 for message in testMessages { while true { let result = message.value.withUnsafeBytes { valueBuffer in @@ -261,7 +273,107 @@ let benchmarks = { } break } - newMessageID += 1 + } + } + try await group.waitForAll() + } + benchmark.stopMeasurement() + } + } + + Benchmark("librdkafka producer with headers") { benchmark in + let testMessages = KafkaTestMessages.create(topic: uniqueTestTopic, headers: KafkaTestMessages.createHeaders(), count: numOfMessages) + var producerConfig: KafkaProducerConfiguration! + + let uniqueGroupID = UUID().uuidString + let rdKafkaProducerConfig = TestRDKafkaClient._createDummyConfig(bootstrapAddresses: bootstrapBrokerAddress(), consumer: false) + + let producer = try TestRDKafkaClient._makeRDKafkaClient(config: rdKafkaProducerConfig, consumer: false) + try await producer.withKafkaHandlePointer { kafkaHandle in + let queue = rd_kafka_queue_get_main(kafkaHandle) + defer { rd_kafka_queue_destroy(queue) } + + let topicHandle = rd_kafka_topic_new( + kafkaHandle, + uniqueTestTopic, + nil + ) + defer { rd_kafka_topic_destroy(topicHandle) } + benchmark.startMeasurement() + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var messagesSent = 0 + while messagesSent < numOfMessages { + let event = rd_kafka_queue_poll(kafkaHandle, 0) + defer { rd_kafka_event_destroy(event) } + + guard let event else { + try await Task.sleep(for: .milliseconds(10)) + continue + } + + let rdEventType = rd_kafka_event_type(event) + if rdEventType == RD_KAFKA_EVENT_DR { + messagesSent += rd_kafka_event_message_count(event) + } + } + } + group.addTask { + for message in testMessages { + message.value.withUnsafeBytes { valueBuffer in + message.key!.withUnsafeBytes { keyBuffer in + let sizeWithoutHeaders = 5 + let size = sizeWithoutHeaders + message.headers.count + var arguments = Array(repeating: rd_kafka_vu_t(), count: size) + var index = 0 + + arguments[index].vtype = RD_KAFKA_VTYPE_RKT + arguments[index].u.rkt = topicHandle + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_PARTITION + arguments[index].u.i32 = Int32(message.partition.rawValue) + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_MSGFLAGS + arguments[index].u.i = RD_KAFKA_MSG_F_COPY + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_KEY + arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: keyBuffer.baseAddress) + arguments[index].u.mem.size = keyBuffer.count + index += 1 + + arguments[index].vtype = RD_KAFKA_VTYPE_VALUE + arguments[index].u.mem.ptr = UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress) + arguments[index].u.mem.size = valueBuffer.count + index += 1 + + return withHeaders(message.headers) { key, value in + arguments[index].vtype = RD_KAFKA_VTYPE_HEADER + + arguments[index].u.header.name = key + arguments[index].u.header.val = value?.baseAddress + arguments[index].u.header.size = value?.count ?? 0 + + index += 1 + } finalizeClosure: { + assert(arguments.count == size) + while true { + let result = rd_kafka_produceva( + kafkaHandle, + arguments, + arguments.count + ) + if rd_kafka_error_code(result) != RD_KAFKA_RESP_ERR_NO_ERROR { + rd_kafka_flush(kafkaHandle, 10) + continue + } + break + } + } + } + } } } try await group.waitForAll() diff --git a/Sources/KafkaTestUtils/KafkaTestMessages.swift b/Sources/KafkaTestUtils/KafkaTestMessages.swift index 31879efb..cec56027 100644 --- a/Sources/KafkaTestUtils/KafkaTestMessages.swift +++ b/Sources/KafkaTestUtils/KafkaTestMessages.swift @@ -67,4 +67,12 @@ public struct KafkaTestMessages { ) } } + + public static func createHeaders(count: Int = 10) -> [KafkaHeader] { + return Array(0.. Date: Fri, 27 Oct 2023 16:03:42 +0300 Subject: [PATCH 12/12] add consumer with headers --- .../KafkaConsumerBenchmark.swift | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index a6863b7c..b19efa8e 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -302,4 +302,67 @@ let benchmarks = { benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") } } + + Benchmark("librdkafka consumer with headers") { benchmark in + try await prepareTopic(withHeaders: true) + + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(kafkaHost):\(kafkaPort)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning" + ] + + let consumer = try TestRDKafkaClient._makeRDKafkaClient(config: rdKafkaConsumerConfig) + try await consumer.withKafkaHandlePointer { kafkaHandle in + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(numOfMessages / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.startMeasurement() + + while ctr < numOfMessages { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 0) else { + try await Task.sleep(for: .milliseconds(1)) // set as defaulat pollInterval for swift-kafka + continue + } + defer { + rd_kafka_message_destroy(record) + } + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(numOfMessages))%") + tmpCtr = 0 + } + } + + benchmark.stopMeasurement() + + rd_kafka_consumer_close(kafkaHandle) + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + } }