Skip to content

Commit 1c8ae14

Browse files
committed
Add cardinality limit for spanmetrics
Signed-off-by: Israel Blancas <[email protected]>
1 parent 9bb671c commit 1c8ae14

File tree

9 files changed

+689
-32
lines changed

9 files changed

+689
-32
lines changed

.chloggen/38990.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: spanmetricsconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add new `aggregation_cardinality_limit` configuration option to limit the number of unique combinations of dimensions that will be tracked for metrics aggregation.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38990]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/spanmetricsconnector/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ The following settings can be optionally configured:
122122
- `enabled`: (default: `false`): enabling will add the events metric.
123123
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
124124
- `resource_metrics_key_attributes`: Filter the resource attributes used to produce the resource metrics key map hash. Use this in case changing resource attributes (e.g. process id) are breaking counter metrics.
125+
- `aggregation_cardinality_limit` (default: `0`): Defines the maximum number of unique combinations of dimensions that will be tracked for metrics aggregation. When the limit is reached, additional unique combinations will be dropped but registered under a new entry with `otel.metric.overflow="true"`. A value of `0` means no limit is applied.
125126

126127
The feature gate `connector.spanmetrics.legacyMetricNames` (disabled by default) controls the connector to use legacy metric names.
127128

connector/spanmetricsconnector/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type Config struct {
8888
Events EventsConfig `mapstructure:"events"`
8989

9090
IncludeInstrumentationScope []string `mapstructure:"include_instrumentation_scope"`
91+
92+
AggregationCardinalityLimit int `mapstructure:"aggregation_cardinality_limit"`
9193
}
9294

9395
type HistogramConfig struct {
@@ -157,6 +159,10 @@ func (c Config) Validate() error {
157159
)
158160
}
159161

162+
if c.AggregationCardinalityLimit < 0 {
163+
return fmt.Errorf("invalid aggregation_cardinality_limit: %v, the limit should be positive", c.AggregationCardinalityLimit)
164+
}
165+
160166
return nil
161167
}
162168

connector/spanmetricsconnector/config_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,126 @@ func TestValidateEventDimensions(t *testing.T) {
288288
})
289289
}
290290
}
291+
292+
func TestConfigValidate(t *testing.T) {
293+
tests := []struct {
294+
name string
295+
config Config
296+
expectedErr string
297+
}{
298+
{
299+
name: "valid config",
300+
config: Config{
301+
DimensionsCacheSize: 1000,
302+
ResourceMetricsCacheSize: 1000,
303+
MetricsFlushInterval: 60 * time.Second,
304+
Histogram: HistogramConfig{
305+
Explicit: &ExplicitHistogramConfig{
306+
Buckets: []time.Duration{10 * time.Millisecond},
307+
},
308+
},
309+
},
310+
},
311+
{
312+
name: "invalid dimensions cache size",
313+
config: Config{
314+
DimensionsCacheSize: -1,
315+
ResourceMetricsCacheSize: 1000,
316+
MetricsFlushInterval: 60 * time.Second,
317+
},
318+
expectedErr: "invalid cache size: -1, the maximum number of the items in the cache should be positive",
319+
},
320+
{
321+
name: "invalid metrics flush interval",
322+
config: Config{
323+
DimensionsCacheSize: 1000,
324+
ResourceMetricsCacheSize: 1000,
325+
MetricsFlushInterval: -1 * time.Second,
326+
},
327+
expectedErr: "invalid metrics_flush_interval: -1s, the duration should be positive",
328+
},
329+
{
330+
name: "invalid metrics expiration",
331+
config: Config{
332+
DimensionsCacheSize: 1000,
333+
ResourceMetricsCacheSize: 1000,
334+
MetricsFlushInterval: 60 * time.Second,
335+
MetricsExpiration: -1 * time.Second,
336+
},
337+
expectedErr: "invalid metrics_expiration: -1s, the duration should be positive",
338+
},
339+
{
340+
name: "invalid delta timestamp cache size",
341+
config: Config{
342+
DimensionsCacheSize: 1000,
343+
ResourceMetricsCacheSize: 1000,
344+
MetricsFlushInterval: 60 * time.Second,
345+
AggregationTemporality: delta,
346+
TimestampCacheSize: new(int), // zero value
347+
},
348+
expectedErr: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",
349+
},
350+
{
351+
name: "invalid aggregation cardinality limit",
352+
config: Config{
353+
DimensionsCacheSize: 1000,
354+
ResourceMetricsCacheSize: 1000,
355+
MetricsFlushInterval: 60 * time.Second,
356+
AggregationCardinalityLimit: -1,
357+
},
358+
expectedErr: "invalid aggregation_cardinality_limit: -1, the limit should be positive",
359+
},
360+
{
361+
name: "both explicit and exponential histogram",
362+
config: Config{
363+
DimensionsCacheSize: 1000,
364+
ResourceMetricsCacheSize: 1000,
365+
MetricsFlushInterval: 60 * time.Second,
366+
Histogram: HistogramConfig{
367+
Explicit: &ExplicitHistogramConfig{
368+
Buckets: []time.Duration{10 * time.Millisecond},
369+
},
370+
Exponential: &ExponentialHistogramConfig{
371+
MaxSize: 10,
372+
},
373+
},
374+
},
375+
expectedErr: "use either `explicit` or `exponential` buckets histogram",
376+
},
377+
{
378+
name: "duplicate dimension name",
379+
config: Config{
380+
DimensionsCacheSize: 1000,
381+
ResourceMetricsCacheSize: 1000,
382+
MetricsFlushInterval: 60 * time.Second,
383+
Dimensions: []Dimension{
384+
{Name: "service.name"},
385+
},
386+
},
387+
expectedErr: "failed validating dimensions: duplicate dimension name service.name",
388+
},
389+
{
390+
name: "events enabled with no dimensions",
391+
config: Config{
392+
DimensionsCacheSize: 1000,
393+
ResourceMetricsCacheSize: 1000,
394+
MetricsFlushInterval: 60 * time.Second,
395+
Events: EventsConfig{
396+
Enabled: true,
397+
},
398+
},
399+
expectedErr: "failed validating event dimensions: no dimensions configured for events",
400+
},
401+
}
402+
403+
for _, tt := range tests {
404+
t.Run(tt.name, func(t *testing.T) {
405+
err := tt.config.Validate()
406+
if tt.expectedErr != "" {
407+
assert.ErrorContains(t, err, tt.expectedErr)
408+
} else {
409+
assert.NoError(t, err)
410+
}
411+
})
412+
}
413+
}

connector/spanmetricsconnector/connector.go

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ const (
4444
metricNameEvents = "events"
4545

4646
defaultUnit = metrics.Milliseconds
47+
48+
// https://github.com/open-telemetry/opentelemetry-go/blob/3ae002c3caf3e44387f0554dfcbbde2c5aab7909/sdk/metric/internal/aggregate/limit.go#L11C36-L11C50
49+
overflowKey = "otel.metric.overflow"
4750
)
4851

4952
type connectorImp struct {
@@ -401,11 +404,33 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
401404
}
402405
key := p.buildKey(serviceName, span, p.dimensions, resourceAttr)
403406

404-
attributes, ok := p.metricKeyToDimensions.Get(key)
405-
if !ok {
406-
attributes = p.buildAttributes(serviceName, span, resourceAttr, p.dimensions, ils.Scope())
407-
p.metricKeyToDimensions.Add(key, attributes)
407+
var attributes pcommon.Map
408+
409+
// Note: we check cardinality limit here for sums metrics but it is the same
410+
// for histograms because both use the same key and attributes.
411+
if rm.sums.IsCardinalityLimitReached() {
412+
attributes = pcommon.NewMap()
413+
for _, d := range p.dimensions {
414+
if v, exists := utilattri.GetDimensionValue(d, span.Attributes(), resourceAttr); exists {
415+
v.CopyTo(attributes.PutEmpty(d.Name))
416+
}
417+
}
418+
attributes.PutBool(overflowKey, true)
419+
} else {
420+
var cached bool
421+
attributes, cached = p.metricKeyToDimensions.Get(key)
422+
if !cached {
423+
attributes = p.buildAttributes(
424+
serviceName,
425+
span,
426+
resourceAttr,
427+
p.dimensions,
428+
ils.Scope(),
429+
)
430+
p.metricKeyToDimensions.Add(key, attributes)
431+
}
408432
}
433+
409434
if !p.config.Histogram.Disable {
410435
// aggregate histogram metrics
411436
h := histograms.GetOrCreate(key, attributes)
@@ -427,16 +452,30 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
427452
eDimensions = append(eDimensions, p.eDimensions...)
428453

429454
rscAndEventAttrs := pcommon.NewMap()
455+
430456
rscAndEventAttrs.EnsureCapacity(resourceAttr.Len() + event.Attributes().Len())
431457
resourceAttr.CopyTo(rscAndEventAttrs)
432-
event.Attributes().CopyTo(rscAndEventAttrs)
458+
// We cannot use CopyTo because it overrides the existing keys.
459+
event.Attributes().Range(func(k string, v pcommon.Value) bool {
460+
rscAndEventAttrs.PutStr(k, v.Str())
461+
return true
462+
})
433463

434464
eKey := p.buildKey(serviceName, span, eDimensions, rscAndEventAttrs)
435-
eAttributes, ok := p.metricKeyToDimensions.Get(eKey)
436-
if !ok {
437-
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
438-
p.metricKeyToDimensions.Add(eKey, eAttributes)
465+
466+
var eAttributes pcommon.Map
467+
if rm.events.IsCardinalityLimitReached() {
468+
eAttributes = pcommon.NewMap()
469+
rscAndEventAttrs.CopyTo(eAttributes)
470+
eAttributes.PutBool(overflowKey, true)
471+
} else {
472+
eAttributes, ok = p.metricKeyToDimensions.Get(eKey)
473+
if !ok {
474+
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
475+
p.metricKeyToDimensions.Add(eKey, eAttributes)
476+
}
439477
}
478+
440479
e := events.GetOrCreate(eKey, eAttributes)
441480
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
442481
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
@@ -481,8 +520,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta
481520
if !ok {
482521
v = &resourceMetrics{
483522
histograms: initHistogramMetrics(p.config),
484-
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
485-
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
523+
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, p.config.AggregationCardinalityLimit),
524+
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, p.config.AggregationCardinalityLimit),
486525
attributes: attr,
487526
startTimestamp: startTimestamp,
488527
}
@@ -507,7 +546,13 @@ func contains(elements []string, value string) bool {
507546
return false
508547
}
509548

510-
func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map, dimensions []utilattri.Dimension, instrumentationScope pcommon.InstrumentationScope) pcommon.Map {
549+
func (p *connectorImp) buildAttributes(
550+
serviceName string,
551+
span ptrace.Span,
552+
resourceAttrs pcommon.Map,
553+
dimensions []utilattri.Dimension,
554+
instrumentationScope pcommon.InstrumentationScope,
555+
) pcommon.Map {
511556
attr := pcommon.NewMap()
512557
attr.EnsureCapacity(4 + len(dimensions))
513558
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
@@ -523,22 +568,26 @@ func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, res
523568
attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
524569
}
525570

526-
for _, d := range dimensions {
527-
if v, ok := utilattri.GetDimensionValue(d, span.Attributes(), resourceAttrs); ok {
528-
v.CopyTo(attr.PutEmpty(d.Name))
529-
}
530-
}
531-
532571
if contains(p.config.IncludeInstrumentationScope, instrumentationScope.Name()) && instrumentationScope.Name() != "" {
533572
attr.PutStr(instrumentationScopeNameKey, instrumentationScope.Name())
534573
if instrumentationScope.Version() != "" {
535574
attr.PutStr(instrumentationScopeVersionKey, instrumentationScope.Version())
536575
}
537576
}
538577

578+
addResourceAttributes(&attr, dimensions, span, resourceAttrs)
579+
539580
return attr
540581
}
541582

583+
func addResourceAttributes(attrs *pcommon.Map, dimensions []utilattri.Dimension, span ptrace.Span, resourceAttrs pcommon.Map) {
584+
for _, d := range dimensions {
585+
if v, ok := utilattri.GetDimensionValue(d, span.Attributes(), resourceAttrs); ok {
586+
v.CopyTo(attrs.PutEmpty(d.Name))
587+
}
588+
}
589+
}
590+
542591
func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {
543592
if prefixSep {
544593
dest.WriteString(metricKeySeparator)
@@ -553,6 +602,7 @@ func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {
553602
// The metric key is a simple concatenation of dimension values, delimited by a null character.
554603
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []utilattri.Dimension, resourceOrEventAttrs pcommon.Map) metrics.Key {
555604
p.keyBuf.Reset()
605+
556606
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
557607
concatDimensionValue(p.keyBuf, serviceName, false)
558608
}

0 commit comments

Comments
 (0)