Skip to content

Commit fb63ec2

Browse files
Merge branch 'main' into add-splunk-rolling-restart-metric
2 parents 1a79df3 + 5b6b522 commit fb63ec2

File tree

16 files changed

+119
-7
lines changed

16 files changed

+119
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add compression level in kafka producer.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39772]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ The following settings can be optionally configured:
8989
- `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes
9090
- `required_acks` (default = 1) controls when a message is regarded as transmitted. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
9191
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type
92+
- `compression_params`
93+
- `level` (default = -1) the compression level used when producing messages to kafka.
94+
- The following are valid combinations of `compression` and `level`
95+
- `gzip`
96+
- BestSpeed: `1`
97+
- BestCompression: `9`
98+
- DefaultCompression: `-1`
99+
- `zstd`
100+
- SpeedFastest: `1`
101+
- SpeedDefault: `3`
102+
- SpeedBetterCompression: `6`
103+
- SpeedBestCompression: `11`
104+
- `lz4`
105+
Only supports fast level
106+
- `snappy`
107+
No compression levels supported yet
92108
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
93109

94110
### Supported encodings

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ require (
9090
github.com/xdg-go/scram v1.1.2 // indirect
9191
github.com/xdg-go/stringprep v1.0.4 // indirect
9292
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
93+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
9394
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
9495
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
9596
go.opentelemetry.io/collector/consumer/consumertest v0.125.1-0.20250505155216-829157cef7bb // indirect

exporter/kafkaexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extension/observer/kafkatopicsobserver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ require (
6969
github.com/xdg-go/scram v1.1.2 // indirect
7070
github.com/xdg-go/stringprep v1.0.4 // indirect
7171
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
72+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
7273
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
7374
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
7475
go.opentelemetry.io/collector/featuregate v1.31.1-0.20250505152726-56c7da210783 // indirect

extension/observer/kafkatopicsobserver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/kafka/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/config/configcompression"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1415
)
@@ -21,6 +22,13 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
2122
"zstd": sarama.CompressionZSTD,
2223
}
2324

25+
func convertToSaramaCompressionLevel(p configcompression.Level) int {
26+
if p == configcompression.DefaultCompressionLevel {
27+
return sarama.CompressionLevelDefault
28+
}
29+
return int(p)
30+
}
31+
2432
var saramaInitialOffsets = map[string]int64{
2533
configkafka.EarliestOffset: sarama.OffsetOldest,
2634
configkafka.LatestOffset: sarama.OffsetNewest,
@@ -96,6 +104,7 @@ func NewSaramaSyncProducer(
96104
saramaConfig.Producer.RequiredAcks = sarama.RequiredAcks(producerConfig.RequiredAcks)
97105
saramaConfig.Producer.Timeout = producerTimeout
98106
saramaConfig.Producer.Compression = saramaCompressionCodecs[producerConfig.Compression]
107+
saramaConfig.Producer.CompressionLevel = convertToSaramaCompressionLevel(producerConfig.CompressionParams.Level)
99108
return sarama.NewSyncProducer(clientConfig.Brokers, saramaConfig)
100109
}
101110

internal/kafka/configkafka/config.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/config/configcompression"
1213
"go.opentelemetry.io/collector/config/configtls"
1314
"go.opentelemetry.io/collector/confmap"
1415
)
@@ -184,6 +185,9 @@ type ProducerConfig struct {
184185
// The options are: 'none' (default), 'gzip', 'snappy', 'lz4', and 'zstd'
185186
Compression string `mapstructure:"compression"`
186187

188+
// CompressionParams defines compression parameters for the producer.
189+
CompressionParams configcompression.CompressionParams `mapstructure:"compression_params"`
190+
187191
// The maximum number of messages the producer will send in a single
188192
// broker request. Defaults to 0 for unlimited. Similar to
189193
// `queue.buffering.max.messages` in the JVM producer.
@@ -192,17 +196,26 @@ type ProducerConfig struct {
192196

193197
func NewDefaultProducerConfig() ProducerConfig {
194198
return ProducerConfig{
195-
MaxMessageBytes: 1000000,
196-
RequiredAcks: WaitForLocal,
197-
Compression: "none",
199+
MaxMessageBytes: 1000000,
200+
RequiredAcks: WaitForLocal,
201+
Compression: "none",
202+
CompressionParams: configcompression.CompressionParams{
203+
Level: configcompression.DefaultCompressionLevel,
204+
},
198205
FlushMaxMessages: 0,
199206
}
200207
}
201208

202209
func (c ProducerConfig) Validate() error {
203210
switch c.Compression {
204211
case "none", "gzip", "snappy", "lz4", "zstd":
205-
// Valid compression
212+
ct := configcompression.Type(c.Compression)
213+
if !ct.IsCompressed() {
214+
return nil
215+
}
216+
if err := ct.ValidateParams(c.CompressionParams); err != nil {
217+
return err
218+
}
206219
default:
207220
return fmt.Errorf(
208221
"compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value is %q",

internal/kafka/configkafka/config_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configcompression"
1314
"go.opentelemetry.io/collector/config/configtls"
1415
"go.opentelemetry.io/collector/confmap/confmaptest"
1516
"go.opentelemetry.io/collector/confmap/xconfmap"
@@ -162,12 +163,29 @@ func TestProducerConfig(t *testing.T) {
162163
},
163164
"full": {
164165
expected: ProducerConfig{
165-
MaxMessageBytes: 1,
166-
RequiredAcks: 0,
167-
Compression: "gzip",
166+
MaxMessageBytes: 1,
167+
RequiredAcks: 0,
168+
Compression: "gzip",
169+
CompressionParams: configcompression.CompressionParams{
170+
Level: 1,
171+
},
168172
FlushMaxMessages: 2,
169173
},
170174
},
175+
"default_compression_level": {
176+
expected: ProducerConfig{
177+
MaxMessageBytes: 1,
178+
RequiredAcks: 0,
179+
Compression: "zstd",
180+
CompressionParams: configcompression.CompressionParams{
181+
Level: configcompression.DefaultCompressionLevel,
182+
},
183+
FlushMaxMessages: 2,
184+
},
185+
},
186+
"invalid_compression_level": {
187+
expectedErr: `unsupported parameters {Level:-123} for compression type "gzip"`,
188+
},
171189
"required_acks_all": {
172190
expected: func() ProducerConfig {
173191
cfg := NewDefaultProducerConfig()

internal/kafka/configkafka/testdata/producer_config.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,20 @@ kafka/full:
33
max_message_bytes: 1
44
required_acks: 0
55
compression: gzip
6+
compression_params:
7+
level: 1
8+
flush_max_messages: 2
9+
kafka/default_compression_level:
10+
max_message_bytes: 1
11+
required_acks: 0
12+
compression: zstd
13+
flush_max_messages: 2
14+
kafka/invalid_compression_level:
15+
max_message_bytes: 1
16+
required_acks: 0
17+
compression: gzip
18+
compression_params:
19+
level: -123
620
flush_max_messages: 2
721
kafka/required_acks_all:
822
required_acks: all

internal/kafka/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
1313
github.com/xdg-go/scram v1.1.2
1414
go.opentelemetry.io/collector/component v1.31.1-0.20250505152726-56c7da210783
15+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783
1516
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783
1617
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783
1718
go.opentelemetry.io/collector/confmap v1.31.1-0.20250505152726-56c7da210783

internal/kafka/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkametricsreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ require (
7878
github.com/xdg-go/scram v1.1.2 // indirect
7979
github.com/xdg-go/stringprep v1.0.4 // indirect
8080
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
81+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
8182
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
8283
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
8384
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250505155216-829157cef7bb // indirect

receiver/kafkametricsreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ require (
101101
github.com/xdg-go/scram v1.1.2 // indirect
102102
github.com/xdg-go/stringprep v1.0.4 // indirect
103103
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
104+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
104105
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
105106
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250505155216-829157cef7bb // indirect
106107
go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250505155216-829157cef7bb // indirect

receiver/kafkareceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)