Skip to content

Consumer performance benchmark #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

blindspotbounty
Copy link
Collaborator

This PR addresses the need of performance measurements for #132

Motivation

Reading messages with swift-kafka-client KafkaConsumer is up to 50x slower than librdkafka allows.

The following results with 500000 messages in docker (reproducible for 5.7-5.10 compilers):

SwiftKafkaConsumer
╒══════════════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                           │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞══════════════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ (Alloc + Retain) - Release Δ (K) │    1018 │    1018 │    1018 │    1018 │    1018 │    1018 │    1018 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Memory (allocated) (M)           │     158 │     158 │     158 │     158 │     158 │     158 │     158 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Object allocs (K)                │    2023 │    2023 │    2023 │    2023 │    2023 │    2023 │    2023 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Releases (M)                     │      16 │      16 │      16 │      16 │      16 │      16 │      16 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Retains (M)                      │      13 │      13 │      13 │      13 │      13 │      13 │      13 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)            │    7940 │    7940 │    7940 │    7940 │    7940 │    7940 │    7940 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)            │      10 │      10 │      10 │      10 │      10 │      10 │      10 │       1 │
╘══════════════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

librdkafka
╒══════════════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                           │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞══════════════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ (Alloc + Retain) - Release Δ     │     101 │     101 │     101 │     101 │     101 │     101 │     101 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Memory (allocated) (M)           │     374 │     374 │     374 │     374 │     374 │     374 │     374 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Object allocs                    │     202 │     202 │     202 │     202 │     202 │     202 │     202 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Releases                         │     303 │     303 │     303 │     303 │     303 │     303 │     303 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Retains                          │       0 │       0 │       0 │       0 │       0 │       0 │       0 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)            │     140 │     140 │     140 │     140 │     140 │     140 │     140 │       1 │
├──────────────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms)           │     175 │     175 │     175 │     175 │     175 │     175 │     175 │       1 │
╘══════════════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

Test shows the difference between time 140ms for librdkafka and 7940 for swift-kafka-client, about 57x difference for this run.

Benchmark is added in the same fashion as for apple/swift-nio#2536 and apple/swift-certificates#125.

There are 2 benchmarks:

  1. Benchmark with a simple kafka consumer that fetches 500k messages
  2. Benchmark with a simple librdkafka consumer that fetches 500k messages

All benchmarks can be executed one of the following ways:

  1. Within docker container:
docker-compose -f docker/docker-compose.yaml run benchmark
  1. Locally with kafka running:
swift package --disable-sandbox benchmark  

Additionally tests allow to tweak number of messages, kafka host/port with environment variables with the following defaults:

MESSAGES_NUMBER=500000 
KAFKA_HOST=localhost
KAFKA_PORT=9092 

I have some doubts about committing thresholds as it probably should be done with some CI machine rather than mine.

@@ -0,0 +1,293 @@
//===----------------------------------------------------------------------===//
Copy link
Contributor

@felixschlegel felixschlegel Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would deduplicate this code but I know it is hard since we are in a separate Swift Package here.
@FranzBusch what do you think? I was thinking maybe vend these functions as @_spi in the Kafka target?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we can expose some of the internal RD abstractions as SPI and use it here directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blindspotbounty let me know if I can help you here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me as well. If I got it right:

  1. Put topic creation/deletion functions to Kafka target as internal + mark them as @_spi
  2. Import Kafka in Tests and Benchmark with @_spi(Internal) import Kafka

Please, let me know if that's not what you keep in mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I have imagined!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably also do the same for func createTestMessages and func sendAndAcknowledgeMessages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarification!

Regarding the latter I had some thoughts in mind about adding new target lib KafkaTestUtils that would be imported by either any tests or benchmarks without modifiers.
For example in Kafka target:

/// RDKafkaClient+TopicCreation.swift
extension RDKafkaClient {
    @_spi(Internal) public func _createUniqueTopic(timeout: Int32) throws -> String
    @_spi(Internal) public func _deleteTopic(_ topic: String, timeout: Int32) throws
}
/// KafkaConsumerConfiguration.swift
extension KafkaConsumerConfiguration {
    @_spi(Internal) public var dictionary: [String: String]
}

Or even mark only some variables as @_spi:

final class RDKafkaClient: Sendable {
    @_spi(Private) public let kafkaHandle: OpaquePointer
}

while in KafkaTestUtils:

@testable @_spi(Internal) import Kafka
...
/// RDKafkaClient+TopicCreation.swift
extension RDKafkaClient {
    public func createUniqueTopic(timeout: Int32) throws -> String {
    ...
    }
    public func deleteTopic(_ topic: String, timeout: Int32) throws {
    ...
    }
    @discardableResult
    func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
    ...
    }
}
/// KafkaConsumerConfiguration.swift
extension KafkaConsumerConfiguration {
    public var rdKafkaConfig: [String: String] { self.dictionary }
}

Then in Benchmark or in Test, we import KafkaTestUtils (+ optionally Kafka) and use both interfaces without modifiers while not polishing production code base.
Though it might be a slight overengineering :)

@hassila
Copy link

hassila commented Oct 13, 2023

I would also review which metrics we'd like for threshold checks.

@blindspotbounty
Copy link
Collaborator Author

I would also review which metrics we'd like for threshold checks.

I've remained only memory, cpu and wall clock time. That should be enough for the beginning.

Copy link
Contributor

@felixschlegel felixschlegel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, left some comments inline!

I know we agreed to have a KafkaTestUtils target but now I am having second thoughts about if really want to expose this as a separate .library or just keep it inside of Kafka and make methods like createUniqueTestTopic accessible through @_spi.

Please let me know what you think and would also love to hear @FranzBusch 's opinion on this.

Best,
Felix

@@ -116,7 +117,8 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable {
)
}

static func topicCreation(
@_spi(Internal)
public static func topicCreation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this error can only occur you can move it to an extension KafkaError in the KafkaTestUtils target and remove the @spi annotation

@@ -126,7 +128,8 @@ public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable {
)
}

static func topicDeletion(
@_spi(Internal)
public static func topicDeletion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment about static func topicCreation

#endif
}

//enum RDKafkaClientHolderError: Error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the commented out code before merging

@@ -43,6 +43,10 @@ let package = Package(
name: "KafkaFoundationCompat",
targets: ["KafkaFoundationCompat"]
),
.library(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch are you ok with exposing KafkaTestUtils as a .library?

Alternatively, we could vend the internal testing functions as part of the Kafka package and mark them as @_spi(Internal). Then KafkaTests and Benchmarks would @_spi(Internal) import Kafka to access createUniqueTestTopic and deleteTopic et. al.

}
}

var receivedDeliveryReportsCtr = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var receivedDeliveryReportsCtr = 0
var receivedDeliveryReportsCounter = 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, we try to avoid abbreviations unless they make sense (e.g. HTTP), please also do that for the other abbreviations you used

}

var receivedDeliveryReportsCtr = 0
var prevPercent = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var prevPercent = 0
var previousPercent = 0

@blindspotbounty blindspotbounty marked this pull request as ready for review October 27, 2023 13:14
@blindspotbounty
Copy link
Collaborator Author

While it is not decided where to put testing utils yet, I've added tests for producer and for producer/consumer with headers.
For local run with redpanda (C++ kafka implementation), there are following numbers:

============================
SwiftKafkaConsumerBenchmarks
============================

SwiftKafkaConsumer
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     208 │     208 │     208 │     208 │     208 │     208 │     208 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    5182 │    5182 │    5182 │    5182 │    5182 │    5182 │    5182 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      11 │      11 │      11 │      11 │      11 │      11 │      11 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaConsumer with headers
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     179 │     179 │     179 │     179 │     179 │     179 │     179 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    6850 │    6850 │    6850 │    6850 │    6850 │    6850 │    6850 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (s)  │      13 │      13 │      13 │      13 │      13 │      13 │      13 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

librdkafka consumer
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     290 │     290 │     290 │     290 │     290 │     290 │     290 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │     189 │     189 │     189 │     189 │     189 │     189 │     189 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │     275 │     275 │     275 │     275 │     275 │     275 │     275 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

librdkafka consumer with headers
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     423 │     423 │     423 │     423 │     423 │     423 │     423 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │     463 │     463 │     463 │     463 │     463 │     463 │     463 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │     561 │     561 │     561 │     561 │     561 │     561 │     561 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

============================
SwiftKafkaProducerBenchmarks
============================

SwiftKafkaProducer
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     297 │     297 │     297 │     297 │     297 │     297 │     297 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │     647 │     647 │     647 │     647 │     647 │     647 │     647 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │     832 │     832 │     832 │     832 │     832 │     832 │     832 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

SwiftKafkaProducer with headers
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     442 │     442 │     442 │     442 │     442 │     442 │     442 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    1875 │    1875 │    1875 │    1875 │    1875 │    1875 │    1875 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │    2158 │    2158 │    2158 │    2158 │    2158 │    2158 │    2158 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

librdkafka producer
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     177 │     177 │     177 │     177 │     177 │     177 │     177 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │     387 │     387 │     387 │     387 │     387 │     387 │     387 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │    1186 │    1186 │    1186 │    1186 │    1186 │    1186 │    1186 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

librdkafka producer with headers
╒════════════════════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╤═════════╕
│ Metric                 │      p0 │     p25 │     p50 │     p75 │     p90 │     p99 │    p100 │ Samples │
╞════════════════════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╪═════════╡
│ Memory (allocated) (M) │     272 │     272 │     272 │     272 │     272 │     272 │     272 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (total CPU) (ms)  │    1394 │    1394 │    1394 │    1394 │    1394 │    1394 │    1394 │       1 │
├────────────────────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┼─────────┤
│ Time (wall clock) (ms) │    2019 │    2019 │    2019 │    2019 │    2019 │    2019 │    2019 │       1 │
╘════════════════════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╧═════════╛

Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall thanks for the work but the PR got a bit big and I would love if we could split it into 4 smaller PRs if you don't mind.

  1. Adds the benchmark infrastructure and nothing else, e.g. the dependency and the CI changes + new targets
  2. An AdminClient that we can use to create/delete topics for our benchmarks
  3. The consumer benchmark
  4. The producer benchmark

Moreover, I am thinking if we should just avoid the nested package here and try something. Swift PM supports target based dependency resolution so we ought to be fine to add the package benchmark dependency to our root Package.swift and just define a few new targets. The reason why I want to try this is that I really don't want to add SPI to so many things. At best, the benchmarks just use the public API. If the need for SPI is only the admin client then we should just make that a public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need these utils? They seem like small overhead in the benchmarks themselves and I would rather have them defined there. Moreover I don't think we have to read anytime from the ENV. Let's just hardcode the ports etc.

@_spi(Internal) import Kafka
import struct Foundation.UUID

public extension TestRDKafkaClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit unsure about this here. We want to have an admin client anyhow so maybe we should just start creating an AdminClient that we can use in our benchmarks to set everything up. I am not a big fan of having something here that reaches into the rd_kafka internals outside of our main module.

@blindspotbounty
Copy link
Collaborator Author

As discussed, opened a PR with infrastructure without actual tests: #146

@blindspotbounty blindspotbounty marked this pull request as draft December 4, 2023 15:20
@blindspotbounty
Copy link
Collaborator Author

blindspotbounty commented Dec 4, 2023

Moving to draft since this PR is going to be closed in favour of others.

PRs containing changes:
#146
#149

This one is going to be closed after all changes are merged

@FranzBusch FranzBusch closed this Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants