Skip to content

Commit db09bbc

Browse files
authored
Add compression config for different pipelines (#35625)
1 parent e3baec7 commit db09bbc

File tree

7 files changed

+298
-23
lines changed

7 files changed

+298
-23
lines changed

comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{
150150
// Each NetFlow flow is about 500 bytes
151151
// 10k BatchMaxSize is about 5Mo of content size
152152
defaultBatchMaxSize: 10000,
153-
154153
// High input chan is needed to handle high number of flows being flushed by NetFlow Server every 10s
155154
// Customers might need to set `network_devices.forwarder.input_chan_size` to higher value if flows are dropped
156155
// due to input channel being full.
@@ -206,7 +205,6 @@ var passthroughPipelineDescs = []passthroughPipelineDesc{
206205
defaultBatchMaxConcurrentSend: 10,
207206
defaultBatchMaxContentSize: pkgconfigsetup.DefaultBatchMaxContentSize,
208207
defaultBatchMaxSize: pkgconfigsetup.DefaultBatchMaxSize,
209-
210208
// on every periodic refresh, we re-send all the SBOMs for all the
211209
// container images in the workloadmeta store. This can be a lot of
212210
// payloads at once, so we need a large input channel size to avoid dropping
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
package eventplatformimpl
7+
8+
import (
9+
"testing"
10+
11+
"github.com/stretchr/testify/suite"
12+
13+
"github.com/DataDog/datadog-agent/comp/core/config"
14+
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver"
15+
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl"
16+
laconfig "github.com/DataDog/datadog-agent/comp/logs/agent/config"
17+
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/def"
18+
logscompressionfxmock "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx-mock"
19+
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
20+
)
21+
22+
const (
23+
defaultUseCompression = true
24+
zstdCompressionKind = "zstd"
25+
defaultZstdCompressionLevel = 1
26+
gzipCompressionKind = "gzip"
27+
defaultGzipCompressionLevel = 6
28+
)
29+
30+
type EventPlatformForwarderTestSuite struct {
31+
suite.Suite
32+
config config.Component
33+
receiver eventplatformreceiver.Component
34+
compression logscompression.Component
35+
}
36+
37+
func (suite *EventPlatformForwarderTestSuite) SetupTest() {
38+
suite.config = config.NewMock(suite.T())
39+
40+
suite.receiver = fxutil.Test[eventplatformreceiver.Component](suite.T(), eventplatformreceiverimpl.MockModule())
41+
suite.compression = fxutil.Test[logscompression.Component](suite.T(), logscompressionfxmock.MockModule())
42+
}
43+
44+
func TestEventPlatformForwarderTestSuite(t *testing.T) {
45+
suite.Run(t, new(EventPlatformForwarderTestSuite))
46+
}
47+
48+
func (suite *EventPlatformForwarderTestSuite) TestNewHTTPPassthroughPipelineCompression() {
49+
50+
tests := []struct {
51+
name string
52+
configSetup func(config.Component)
53+
expectedKind string
54+
expectedLevel int
55+
useCompression bool
56+
}{
57+
{
58+
name: "additional endpoints",
59+
configSetup: func(c config.Component) {
60+
c.SetWithoutSource("database_monitoring.metrics.additional_endpoints", `[{"api_key":"foo","host":"bar"}]`)
61+
},
62+
expectedKind: gzipCompressionKind,
63+
expectedLevel: defaultGzipCompressionLevel,
64+
useCompression: defaultUseCompression,
65+
},
66+
{
67+
name: "no compression",
68+
configSetup: func(c config.Component) {
69+
c.SetWithoutSource("database_monitoring.metrics.use_compression", false)
70+
},
71+
expectedKind: "none",
72+
expectedLevel: 0,
73+
useCompression: !defaultUseCompression,
74+
},
75+
{
76+
name: "default compression",
77+
configSetup: func(_ config.Component) {},
78+
expectedKind: zstdCompressionKind,
79+
expectedLevel: defaultZstdCompressionLevel,
80+
useCompression: defaultUseCompression,
81+
},
82+
{
83+
name: "zstd custom compression level",
84+
configSetup: func(c config.Component) {
85+
c.SetWithoutSource("database_monitoring.metrics.compression_kind", "zstd")
86+
c.SetWithoutSource("database_monitoring.metrics.zstd_compression_level", 3)
87+
},
88+
expectedKind: zstdCompressionKind,
89+
expectedLevel: 3,
90+
useCompression: defaultUseCompression,
91+
},
92+
{
93+
name: "gzip compression",
94+
configSetup: func(c config.Component) {
95+
c.SetWithoutSource("database_monitoring.metrics.compression_kind", "gzip")
96+
},
97+
expectedKind: gzipCompressionKind,
98+
expectedLevel: defaultGzipCompressionLevel,
99+
useCompression: defaultUseCompression,
100+
},
101+
{
102+
name: "gzip custom compression level",
103+
configSetup: func(c config.Component) {
104+
c.SetWithoutSource("database_monitoring.metrics.compression_kind", "gzip")
105+
c.SetWithoutSource("database_monitoring.metrics.compression_level", 8)
106+
},
107+
expectedKind: gzipCompressionKind,
108+
expectedLevel: 8,
109+
useCompression: defaultUseCompression,
110+
},
111+
{
112+
name: "invalid compression",
113+
configSetup: func(c config.Component) {
114+
c.SetWithoutSource("database_monitoring.metrics.use_compression", true)
115+
c.SetWithoutSource("database_monitoring.metrics.compression_kind", "gipz")
116+
},
117+
expectedKind: zstdCompressionKind,
118+
expectedLevel: defaultZstdCompressionLevel,
119+
useCompression: defaultUseCompression,
120+
},
121+
}
122+
123+
for _, t := range tests {
124+
suite.Run(t.name, func() {
125+
126+
t.configSetup(suite.config)
127+
128+
desc := passthroughPipelineDesc{
129+
// Only registered config prefixes trigger correct parsing and defaults.
130+
endpointsConfigPrefix: "database_monitoring.metrics.",
131+
}
132+
133+
pipeline, err := newHTTPPassthroughPipeline(
134+
suite.config,
135+
nil,
136+
suite.compression,
137+
desc,
138+
nil,
139+
0,
140+
)
141+
suite.Require().NoError(err)
142+
suite.Require().NotNil(pipeline)
143+
144+
configKeys := laconfig.NewLogsConfigKeys(desc.endpointsConfigPrefix, suite.config)
145+
endpoints, err := laconfig.BuildHTTPEndpointsWithConfig(
146+
suite.config,
147+
configKeys,
148+
"", // hostnameEndpointPrefix not needed
149+
"", // intakeTrackType not needed
150+
"", // protocol not needed
151+
"", // origin not needed
152+
)
153+
suite.Require().NoError(err)
154+
155+
// Check compression settings on endpoint
156+
suite.Equal(t.useCompression, endpoints.Main.UseCompression, "UseCompression mismatch")
157+
if t.useCompression {
158+
suite.Equal(t.expectedKind, endpoints.Main.CompressionKind, "CompressionKind mismatch")
159+
suite.Equal(t.expectedLevel, endpoints.Main.CompressionLevel, "CompressionLevel mismatch")
160+
}
161+
})
162+
suite.resetCompression()
163+
}
164+
}
165+
166+
func (suite *EventPlatformForwarderTestSuite) resetCompression() {
167+
// Reset compression settings to default state
168+
suite.config.SetWithoutSource("database_monitoring.metrics.use_compression", true)
169+
suite.config.SetWithoutSource("database_monitoring.metrics.compression_kind", "zstd")
170+
suite.config.SetWithoutSource("database_monitoring.metrics.compression_level", 6)
171+
suite.config.SetWithoutSource("database_monitoring.metrics.zstd_compression_level", defaultZstdCompressionLevel)
172+
suite.config.SetWithoutSource("database_monitoring.metrics.additional_endpoints", "{}")
173+
174+
}

comp/logs/agent/config/config_keys.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package config
77

88
import (
99
"encoding/json"
10+
"strings"
1011
"time"
1112

1213
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
@@ -22,6 +23,14 @@ type LogsConfigKeys struct {
2223
config pkgconfigmodel.Reader
2324
}
2425

26+
// CompressionKind constants
27+
const (
28+
GzipCompressionKind = "gzip"
29+
GzipCompressionLevel = 6
30+
ZstdCompressionKind = "zstd"
31+
ZstdCompressionLevel = 1
32+
)
33+
2534
// defaultLogsConfigKeys defines the default YAML keys used to retrieve logs configuration
2635
func defaultLogsConfigKeys(config pkgconfigmodel.Reader) *LogsConfigKeys {
2736
return NewLogsConfigKeys("logs_config.", config)
@@ -110,23 +119,40 @@ func (l *LogsConfigKeys) devModeUseProto() bool {
110119
}
111120

112121
func (l *LogsConfigKeys) compressionKind() string {
113-
compressionKind := l.getConfig().GetString(l.getConfigKey("compression_kind"))
114-
switch compressionKind {
115-
case "zstd", "gzip":
116-
log.Debugf("Logs agent is using: %s compression", compressionKind)
122+
configKey := l.getConfigKey("compression_kind")
123+
compressionKind := l.getConfig().GetString(configKey)
124+
125+
endpoints, _ := l.getAdditionalEndpoints()
126+
if len(endpoints) > 0 {
127+
if !l.config.IsConfigured(configKey) {
128+
log.Debugf("Additional endpoints detected, pipeline: %s falling back to gzip compression for compatibility", l.prefix)
129+
return GzipCompressionKind
130+
}
131+
}
132+
133+
if compressionKind == ZstdCompressionKind || compressionKind == GzipCompressionKind {
134+
pipelineName := "Main logs agent pipeline"
135+
if !strings.Contains(l.prefix, "logs_config") {
136+
pipelineName = "Pipeline " + l.prefix
137+
}
138+
log.Debugf("%s is using compression: %s", pipelineName, compressionKind)
117139
return compressionKind
118-
default:
119-
log.Warnf("Invalid compression kind: '%s', falling back to default compression: '%s' ", compressionKind, pkgconfigsetup.DefaultLogCompressionKind)
120-
return pkgconfigsetup.DefaultLogCompressionKind
121140
}
141+
142+
log.Warnf("Invalid compression kind: '%s', falling back to default compression: '%s' ", compressionKind, pkgconfigsetup.DefaultLogCompressionKind)
143+
return pkgconfigsetup.DefaultLogCompressionKind
122144
}
123145

124146
func (l *LogsConfigKeys) compressionLevel() int {
125-
if l.compressionKind() == "zstd" {
126-
return l.getConfig().GetInt(l.getConfigKey("zstd_compression_level"))
147+
if l.compressionKind() == ZstdCompressionKind {
148+
level := l.getConfig().GetInt(l.getConfigKey("zstd_compression_level"))
149+
log.Debugf("Pipeline %s is using zstd compression level: %d", l.prefix, level)
150+
return level
127151
}
128152

129-
return l.getConfig().GetInt(l.getConfigKey("compression_level"))
153+
level := l.getConfig().GetInt(l.getConfigKey("compression_level"))
154+
log.Debugf("Pipeline %s is using compression level: %d", l.prefix, level)
155+
return level
130156
}
131157

132158
func (l *LogsConfigKeys) useCompression() bool {

comp/logs/agent/config/config_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func (suite *ConfigTestSuite) TestEndpointsSetLogsDDUrl() {
539539
Port: 443,
540540
useSSL: true,
541541
UseCompression: true,
542-
CompressionLevel: 6,
542+
CompressionLevel: ZstdCompressionLevel,
543543
BackoffFactor: pkgconfigsetup.DefaultLogsSenderBackoffFactor,
544544
BackoffBase: pkgconfigsetup.DefaultLogsSenderBackoffBase,
545545
BackoffMax: pkgconfigsetup.DefaultLogsSenderBackoffMax,
@@ -587,7 +587,7 @@ func (suite *ConfigTestSuite) TestEndpointsSetDDSite() {
587587
Port: 0,
588588
useSSL: true,
589589
UseCompression: true,
590-
CompressionLevel: 6,
590+
CompressionLevel: ZstdCompressionLevel,
591591
BackoffFactor: pkgconfigsetup.DefaultLogsSenderBackoffFactor,
592592
BackoffBase: pkgconfigsetup.DefaultLogsSenderBackoffBase,
593593
BackoffMax: pkgconfigsetup.DefaultLogsSenderBackoffMax,
@@ -627,7 +627,7 @@ func (suite *ConfigTestSuite) TestBuildServerlessEndpoints() {
627627
Port: 0,
628628
useSSL: true,
629629
UseCompression: true,
630-
CompressionLevel: 6,
630+
CompressionLevel: ZstdCompressionLevel,
631631
BackoffFactor: pkgconfigsetup.DefaultLogsSenderBackoffFactor,
632632
BackoffBase: pkgconfigsetup.DefaultLogsSenderBackoffBase,
633633
BackoffMax: pkgconfigsetup.DefaultLogsSenderBackoffMax,
@@ -659,7 +659,7 @@ func (suite *ConfigTestSuite) TestBuildServerlessEndpoints() {
659659
func getTestEndpoint(host string, port int, ssl bool) Endpoint {
660660
e := NewEndpoint("123", "", host, port, ssl)
661661
e.UseCompression = true
662-
e.CompressionLevel = 6
662+
e.CompressionLevel = ZstdCompressionLevel // by default endpoints uses zstd
663663
e.BackoffFactor = pkgconfigsetup.DefaultLogsSenderBackoffFactor
664664
e.BackoffBase = pkgconfigsetup.DefaultLogsSenderBackoffBase
665665
e.BackoffMax = pkgconfigsetup.DefaultLogsSenderBackoffMax
@@ -744,7 +744,7 @@ func (suite *ConfigTestSuite) TestEndpointsSetNonDefaultCustomConfigs() {
744744
suite.config.SetWithoutSource("api_key", "123")
745745

746746
suite.config.SetWithoutSource("network_devices.netflow.forwarder.use_compression", false)
747-
suite.config.SetWithoutSource("network_devices.netflow.forwarder.compression_level", 10)
747+
suite.config.SetWithoutSource("network_devices.netflow.forwarder.zstd_compression_level", 10)
748748
suite.config.SetWithoutSource("network_devices.netflow.forwarder.batch_wait", 10)
749749
suite.config.SetWithoutSource("network_devices.netflow.forwarder.connection_reset_interval", 3)
750750
suite.config.SetWithoutSource("network_devices.netflow.forwarder.logs_no_ssl", true)
@@ -820,7 +820,7 @@ func (suite *ConfigTestSuite) TestEndpointsSetLogsDDUrlWithPrefix() {
820820
Port: 443,
821821
useSSL: true,
822822
UseCompression: true,
823-
CompressionLevel: 6,
823+
CompressionLevel: ZstdCompressionLevel,
824824
BackoffFactor: pkgconfigsetup.DefaultLogsSenderBackoffFactor,
825825
BackoffBase: pkgconfigsetup.DefaultLogsSenderBackoffBase,
826826
BackoffMax: pkgconfigsetup.DefaultLogsSenderBackoffMax,
@@ -865,7 +865,7 @@ func (suite *ConfigTestSuite) TestEndpointsSetDDUrlWithPrefix() {
865865
Port: 443,
866866
useSSL: true,
867867
UseCompression: true,
868-
CompressionLevel: 6,
868+
CompressionLevel: ZstdCompressionLevel,
869869
BackoffFactor: pkgconfigsetup.DefaultLogsSenderBackoffFactor,
870870
BackoffBase: pkgconfigsetup.DefaultLogsSenderBackoffBase,
871871
BackoffMax: pkgconfigsetup.DefaultLogsSenderBackoffMax,

0 commit comments

Comments
 (0)