Skip to content

Commit ebcb5d5

Browse files
marclopdd-jasminesun
authored andcommitted
kafkaexporter: Opt-in to use franz-go client (open-telemetry#40364)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds the option to use the franz-go client in the `kafkaexporter`. The main rationale for this change is to allow users to opt-in to use a more performant Kafka client, as the default Sarama client has been known to have performance issues in high-throughput scenarios. The change _should_ be backward compatible, and significantly increases the production performance of the `kafkaexporter` when in use. ``` $ benchstat sarama.txt franz-go.txt goos: darwin goarch: arm64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter cpu: Apple M2 Pro │ sarama.txt │ franz-go.txt │ │ sec/op │ sec/op vs base │ Logs-12 55.56µ ± 5% 33.40µ ± 4% -39.88% (p=0.000 n=10) Metrics-12 60.69µ ± 6% 39.34µ ± 17% -35.18% (p=0.001 n=10) Traces-12 57.09µ ± 9% 35.71µ ± 13% -37.44% (p=0.000 n=10) geomean 57.74µ 36.07µ -37.53% │ sarama.txt │ franz-go.txt │ │ B/op │ B/op vs base │ Logs-12 5.792Ki ± 0% 2.207Ki ± 1% -61.90% (p=0.000 n=10) Metrics-12 9.545Ki ± 0% 4.339Ki ± 16% -54.54% (p=0.000 n=10) Traces-12 7.329Ki ± 0% 2.967Ki ± 5% -59.52% (p=0.000 n=10) geomean 7.400Ki 3.051Ki -58.76% │ sarama.txt │ franz-go.txt │ │ allocs/op │ allocs/op vs base │ Logs-12 55.00 ± 0% 29.00 ± 0% -47.27% (p=0.000 n=10) Metrics-12 58.00 ± 0% 32.00 ± 0% -44.83% (p=0.000 n=10) Traces-12 55.00 ± 0% 29.00 ± 0% -47.27% (p=0.000 n=10) geomean 55.98 29.97 -46.47% ``` <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Improves the exporter's production performance. <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tested, benchmarked against a local Kafka broker. <!--Describe the documentation added.--> #### Documentation Updated the Readme. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent ee9225a commit ebcb5d5

File tree

26 files changed

+1643
-205
lines changed

26 files changed

+1643
-205
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'kafkaexporter'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add an Alpha feature gate `exporter.kafkaexporter.UseFranzGoClient` to use franz-go in the Kafka exporter for better performance."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40364]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This change adds an experimental opt-in support to use the franz-go client in the Kafka exporter.
20+
The franz-go client is a high-performance Kafka client that can improve the performance of the Kafka exporter.
21+
The default client remains sarama, which is used by the Kafka receiver and other components.
22+
Enable the franz-go client by setting the `exporter.kafkaexporter.UseFranzGo` feature gate.
23+
24+
# If your change doesn't affect end users or the exported elements of any package,
25+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
26+
# Optional: The change log or logs in which this entry should be included.
27+
# e.g. '[user]' or '[user, api]'
28+
# Include 'user' if the change is relevant to end users.
29+
# Include 'api' if there is a change to a library API.
30+
# Default: '[user]'
31+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@ processors for higher throughput and resiliency. Message payload encoding is con
2020

2121
## Configuration settings
2222

23+
> [!NOTE]
24+
> You can opt-in to use [`franz-go`](https://github.com/twmb/franz-go) client by enabling the feature gate
25+
> `exporter.kafkaexporter.UseFranzGo` when you run the OpenTelemetry Collector. See the following page
26+
> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates)
27+
2328
There are no required settings.
2429

2530
The following settings can be optionally configured:
31+
2632
- `brokers` (default = localhost:9092): The list of kafka brokers.
2733
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2834
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.

exporter/kafkaexporter/generated_package_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/kafkaexporter/go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ require (
1616
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.127.0
1717
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.127.0
1818
github.com/stretchr/testify v1.10.0
19+
github.com/twmb/franz-go v1.18.1
20+
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
1921
go.opentelemetry.io/collector/client v1.33.1-0.20250603105141-605011a1fea8
2022
go.opentelemetry.io/collector/component v1.33.1-0.20250603105141-605011a1fea8
2123
go.opentelemetry.io/collector/component/componenttest v0.127.1-0.20250603105141-605011a1fea8
@@ -26,6 +28,7 @@ require (
2628
go.opentelemetry.io/collector/consumer/consumererror v0.127.1-0.20250603105141-605011a1fea8
2729
go.opentelemetry.io/collector/exporter v0.127.1-0.20250603105141-605011a1fea8
2830
go.opentelemetry.io/collector/exporter/exportertest v0.127.1-0.20250603105141-605011a1fea8
31+
go.opentelemetry.io/collector/featuregate v1.33.1-0.20250603105141-605011a1fea8
2932
go.opentelemetry.io/collector/pdata v1.33.1-0.20250603105141-605011a1fea8
3033
go.opentelemetry.io/collector/pdata/testdata v0.127.1-0.20250603105141-605011a1fea8
3134
go.uber.org/goleak v1.3.0
@@ -87,6 +90,9 @@ require (
8790
github.com/pierrec/lz4/v4 v4.1.22 // indirect
8891
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
8992
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
93+
github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect
94+
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect
95+
github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect
9096
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
9197
github.com/xdg-go/scram v1.1.2 // indirect
9298
github.com/xdg-go/stringprep v1.0.4 // indirect
@@ -99,7 +105,6 @@ require (
99105
go.opentelemetry.io/collector/exporter/xexporter v0.127.1-0.20250603105141-605011a1fea8 // indirect
100106
go.opentelemetry.io/collector/extension v1.33.1-0.20250603105141-605011a1fea8 // indirect
101107
go.opentelemetry.io/collector/extension/xextension v0.127.1-0.20250603105141-605011a1fea8 // indirect
102-
go.opentelemetry.io/collector/featuregate v1.33.1-0.20250603105141-605011a1fea8 // indirect
103108
go.opentelemetry.io/collector/internal/telemetry v0.127.1-0.20250603105141-605011a1fea8 // indirect
104109
go.opentelemetry.io/collector/pdata/pprofile v0.127.1-0.20250603105141-605011a1fea8 // indirect
105110
go.opentelemetry.io/collector/pipeline v0.127.1-0.20250603105141-605011a1fea8 // indirect

exporter/kafkaexporter/go.sum

Lines changed: 19 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Package kafkaclient provides implementations of Kafka producers using
5+
// different client libraries (Sarama, Franz-go).
6+
package kafkaclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient"
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkaclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient"
5+
6+
import (
7+
"context"
8+
"errors"
9+
10+
"github.com/twmb/franz-go/pkg/kgo"
11+
)
12+
13+
// FranzSyncProducer is a wrapper around the franz-go client that implements
14+
// the Producer interface. Allowing us to use the franz-go client while
15+
// maintaining compatibility with the existing Kafka exporter code.
16+
type FranzSyncProducer struct {
17+
client *kgo.Client
18+
metadataKeys []string
19+
}
20+
21+
// NewFranzSyncProducer Franz-go producer from a kgo.Client and a Messenger.
22+
func NewFranzSyncProducer(client *kgo.Client,
23+
metadataKeys []string,
24+
) *FranzSyncProducer {
25+
return &FranzSyncProducer{
26+
client: client,
27+
metadataKeys: metadataKeys,
28+
}
29+
}
30+
31+
// ExportData sends a batch of messages to Kafka
32+
func (p *FranzSyncProducer) ExportData(ctx context.Context, msgs Messages) error {
33+
messages := makeFranzMessages(msgs)
34+
setMessageHeaders(ctx, messages, p.metadataKeys,
35+
func(key string, value []byte) kgo.RecordHeader {
36+
return kgo.RecordHeader{Key: key, Value: value}
37+
},
38+
func(m *kgo.Record) []kgo.RecordHeader { return m.Headers },
39+
func(m *kgo.Record, h []kgo.RecordHeader) { m.Headers = h },
40+
)
41+
result := p.client.ProduceSync(ctx, messages...)
42+
var errs []error
43+
for _, r := range result {
44+
if r.Err != nil {
45+
errs = append(errs, r.Err)
46+
}
47+
}
48+
return errors.Join(errs...)
49+
}
50+
51+
// Close shuts down the producer and flushes any remaining messages.
52+
func (p *FranzSyncProducer) Close() error {
53+
p.client.Close()
54+
return nil
55+
}
56+
57+
func makeFranzMessages(messages Messages) []*kgo.Record {
58+
msgs := make([]*kgo.Record, 0, messages.Count)
59+
for _, msg := range messages.TopicMessages {
60+
for _, message := range msg.Messages {
61+
msg := &kgo.Record{Topic: msg.Topic}
62+
if message.Key != nil {
63+
msg.Key = message.Key
64+
}
65+
if message.Value != nil {
66+
msg.Value = message.Value
67+
}
68+
msgs = append(msgs, msg)
69+
}
70+
}
71+
return msgs
72+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkaclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/client"
10+
)
11+
12+
// metadataToHeaders converts metadata from the context into a slice of headers using the provided header constructor.
13+
// This function is generic and can be used for both Sarama and Franz-go header types.
14+
func metadataToHeaders[H any](ctx context.Context, keys []string,
15+
makeHeader func(key string, value []byte) H,
16+
) []H {
17+
if len(keys) == 0 {
18+
return nil
19+
}
20+
info := client.FromContext(ctx)
21+
headers := make([]H, 0, len(keys))
22+
for _, key := range keys {
23+
valueSlice := info.Metadata.Get(key)
24+
for _, v := range valueSlice {
25+
headers = append(headers, makeHeader(key, []byte(v)))
26+
}
27+
}
28+
return headers
29+
}
30+
31+
// setMessageHeaders is a generic helper for setting headers on a slice of messages.
32+
// - messages: the messages to set headers on
33+
// - ctx: context for extracting metadata
34+
// - metadataKeys: which metadata keys to extract
35+
// - makeHeader: constructs the header type for the target client (Sarama/Franz-go)
36+
// - getHeaders: gets the headers from a message
37+
// - setHeaders: sets the headers on a message
38+
// Usage example (Sarama):
39+
//
40+
// setMessageHeaders(ctx, allMessages, keys, makeHeader, getHeaders, setHeaders)
41+
func setMessageHeaders[M any, H any](ctx context.Context,
42+
messages []M,
43+
metadataKeys []string,
44+
makeHeader func(key string, value []byte) H,
45+
getHeaders func(M) []H,
46+
setHeadersFunc func(M, []H),
47+
) {
48+
setHeaders(
49+
messages,
50+
metadataToHeaders(ctx, metadataKeys, makeHeader),
51+
getHeaders,
52+
setHeadersFunc,
53+
)
54+
}
55+
56+
// setHeaders sets or appends headers on each message in messages using the provided get/set functions.
57+
func setHeaders[M any, H any](messages []M, headers []H,
58+
getHeaders func(M) []H,
59+
setHeaders func(M, []H),
60+
) {
61+
if len(headers) == 0 || len(messages) == 0 {
62+
return
63+
}
64+
for i := range messages {
65+
h := getHeaders(messages[i])
66+
if len(h) == 0 {
67+
setHeaders(messages[i], headers)
68+
} else {
69+
setHeaders(messages[i], append(h, headers...))
70+
}
71+
}
72+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkaclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/kafkaclient"
5+
6+
import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/marshaler"
7+
8+
// Messages is a collection of messages (with count) to be sent to Kafka.
9+
type Messages struct {
10+
// Count is the total number of messages across all topics.
11+
// Populating this field allows the downstream method to preallocate
12+
// the slice of messages, which can improve performance.
13+
Count int
14+
TopicMessages []TopicMessages
15+
}
16+
17+
// TopicMessages represents a collection of messages for a specific topic.
18+
type TopicMessages struct {
19+
Topic string
20+
Messages []marshaler.Message
21+
}

0 commit comments

Comments
 (0)