Skip to content

Commit 7359094

Browse files
portertechalbertteoh
authored andcommitted
Fix broken spanmetrics counters after span producing service restart (open-telemetry#29711)
My spanmetrics counters (e.g. `calls_total`) break after restarting the span producing service. For example: ![Screenshot from 2023-12-06 11-39-57](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/149630/abea1b72-392b-4f1f-a403-644c4e356f3d) I discovered that the resource key used for the calculated metrics was a map hash of the resource attributes. This worked fine for some instrumented services, however, other services include attributes like its process id etc. Restarting one of these services would result in a new hash and calculated resource metrics (in addition to the existing ones). This pull-request filters the resource attributes used to produce the resource metrics key map hash. I am now able to restart services without breaking my counters. --------- Signed-off-by: Sean Porter <[email protected]> Co-authored-by: Albert <[email protected]>
1 parent 3b7f728 commit 7359094

File tree

6 files changed

+146
-27
lines changed

6 files changed

+146
-27
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: connector/spanmetrics
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Configurable resource metrics key attributes, filter the resource attributes used to create the resource metrics key.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29711]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This enhancement can be used to fix broken spanmetrics counters after a span producing service restart, when resource attributes contain dynamic/ephemeral values (e.g. process id).
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/spanmetricsconnector/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ type Config struct {
5151
// Optional. See defaultResourceMetricsCacheSize in connector.go for the default value.
5252
ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"`
5353

54+
// ResourceMetricsKeyAttributes filters the resource attributes used to create the resource metrics key hash.
55+
// This can be used to avoid situations where resource attributes may change across service restarts, causing
56+
// metric counters to break (and duplicate). A resource does not need to have all of the attributes. The list
57+
// must include enough attributes to properly identify unique resources or risk aggregating data from more
58+
// than one service and span.
59+
// e.g. ["service.name", "telemetry.sdk.language", "telemetry.sdk.name"]
60+
// See https://opentelemetry.io/docs/specs/semconv/resource/ for possible attributes.
61+
ResourceMetricsKeyAttributes []string `mapstructure:"resource_metrics_key_attributes"`
62+
5463
AggregationTemporality string `mapstructure:"aggregation_temporality"`
5564

5665
Histogram HistogramConfig `mapstructure:"histogram"`

connector/spanmetricsconnector/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ func TestLoadConfig(t *testing.T) {
110110
Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint},
111111
},
112112
},
113+
{
114+
id: component.NewIDWithName(metadata.Type, "resource_metrics_key_attributes"),
115+
expected: &Config{
116+
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
117+
DimensionsCacheSize: defaultDimensionsCacheSize,
118+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
119+
ResourceMetricsKeyAttributes: []string{"service.name", "telemetry.sdk.language", "telemetry.sdk.name"},
120+
MetricsFlushInterval: 15 * time.Second,
121+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
122+
},
123+
},
113124
}
114125

115126
for _, tt := range tests {

connector/spanmetricsconnector/connector.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type connectorImp struct {
5454

5555
resourceMetrics *cache.Cache[resourceKey, *resourceMetrics]
5656

57+
resourceMetricsKeyAttributes map[string]struct{}
58+
5759
keyBuf *bytes.Buffer
5860

5961
// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
@@ -115,17 +117,24 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
115117
return nil, err
116118
}
117119

120+
resourceMetricsKeyAttributes := make(map[string]struct{}, len(cfg.ResourceMetricsKeyAttributes))
121+
var s struct{}
122+
for _, attr := range cfg.ResourceMetricsKeyAttributes {
123+
resourceMetricsKeyAttributes[attr] = s
124+
}
125+
118126
return &connectorImp{
119-
logger: logger,
120-
config: *cfg,
121-
resourceMetrics: resourceMetricsCache,
122-
dimensions: newDimensions(cfg.Dimensions),
123-
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
124-
metricKeyToDimensions: metricKeyToDimensionsCache,
125-
ticker: ticker,
126-
done: make(chan struct{}),
127-
eDimensions: newDimensions(cfg.Events.Dimensions),
128-
events: cfg.Events,
127+
logger: logger,
128+
config: *cfg,
129+
resourceMetrics: resourceMetricsCache,
130+
resourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
131+
dimensions: newDimensions(cfg.Dimensions),
132+
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
133+
metricKeyToDimensions: metricKeyToDimensionsCache,
134+
ticker: ticker,
135+
done: make(chan struct{}),
136+
eDimensions: newDimensions(cfg.Events.Dimensions),
137+
events: cfg.Events,
129138
}, nil
130139
}
131140

@@ -390,8 +399,21 @@ func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics
390399

391400
type resourceKey [16]byte
392401

402+
func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
403+
if len(p.resourceMetricsKeyAttributes) == 0 {
404+
return pdatautil.MapHash(attr)
405+
}
406+
m := pcommon.NewMap()
407+
attr.CopyTo(m)
408+
m.RemoveIf(func(k string, _ pcommon.Value) bool {
409+
_, ok := p.resourceMetricsKeyAttributes[k]
410+
return !ok
411+
})
412+
return pdatautil.MapHash(m)
413+
}
414+
393415
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
394-
key := resourceKey(pdatautil.MapHash(attr))
416+
key := p.createResourceKey(attr)
395417
v, ok := p.resourceMetrics.Get(key)
396418
if !ok {
397419
v = &resourceMetrics{

connector/spanmetricsconnector/connector_test.go

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ func TestConcurrentShutdown(t *testing.T) {
600600
ticker := mockClock.NewTicker(time.Nanosecond)
601601

602602
// Test
603-
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
603+
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)
604604
err := p.Start(ctx, componenttest.NewNopHost())
605605
require.NoError(t, err)
606606

@@ -680,7 +680,7 @@ func TestConsumeMetricsErrors(t *testing.T) {
680680
}
681681
mockClock := clock.NewMock(time.Now())
682682
ticker := mockClock.NewTicker(time.Nanosecond)
683-
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
683+
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)
684684

685685
ctx := metadata.NewIncomingContext(context.Background(), nil)
686686
err := p.Start(ctx, componenttest.NewNopHost())
@@ -842,7 +842,7 @@ func TestConsumeTraces(t *testing.T) {
842842
mockClock := clock.NewMock(time.Now())
843843
ticker := mockClock.NewTicker(time.Nanosecond)
844844

845-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)
845+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, []string{}, zaptest.NewLogger(t), ticker)
846846

847847
ctx := metadata.NewIncomingContext(context.Background(), nil)
848848
err := p.Start(ctx, componenttest.NewNopHost())
@@ -868,7 +868,7 @@ func TestConsumeTraces(t *testing.T) {
868868
func TestMetricKeyCache(t *testing.T) {
869869
mcon := consumertest.NewNop()
870870

871-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
871+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
872872
traces := buildSampleTrace()
873873

874874
// Test
@@ -898,7 +898,7 @@ func TestMetricKeyCache(t *testing.T) {
898898
func TestResourceMetricsCache(t *testing.T) {
899899
mcon := consumertest.NewNop()
900900

901-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
901+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
902902

903903
// Test
904904
ctx := metadata.NewIncomingContext(context.Background(), nil)
@@ -933,11 +933,53 @@ func TestResourceMetricsCache(t *testing.T) {
933933
assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len())
934934
}
935935

936+
func TestResourceMetricsKeyAttributes(t *testing.T) {
937+
mcon := consumertest.NewNop()
938+
939+
resourceMetricsKeyAttributes := []string{
940+
"service.name",
941+
}
942+
943+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, resourceMetricsKeyAttributes, zaptest.NewLogger(t), nil)
944+
945+
// Test
946+
ctx := metadata.NewIncomingContext(context.Background(), nil)
947+
948+
// 0 resources in the beginning
949+
assert.Zero(t, p.resourceMetrics.Len())
950+
951+
err := p.ConsumeTraces(ctx, buildSampleTrace())
952+
// Validate
953+
require.NoError(t, err)
954+
assert.Equal(t, 2, p.resourceMetrics.Len())
955+
956+
// consume another batch of traces for the same resources
957+
err = p.ConsumeTraces(ctx, buildSampleTrace())
958+
require.NoError(t, err)
959+
assert.Equal(t, 2, p.resourceMetrics.Len())
960+
961+
// consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded
962+
for i := 0; i < resourceMetricsCacheSize; i++ {
963+
traces := buildSampleTrace()
964+
965+
// add resource attributes to simulate additional resources providing data
966+
for j := 0; j < traces.ResourceSpans().Len(); j++ {
967+
traces.ResourceSpans().At(j).Resource().Attributes().PutStr("not included in resource key attributes", fmt.Sprintf("%d", i))
968+
}
969+
970+
err = p.ConsumeTraces(ctx, traces)
971+
require.NoError(t, err)
972+
}
973+
974+
// validate that the additional resources providing data did not result in additional resource metrics
975+
assert.Equal(t, 2, p.resourceMetrics.Len())
976+
}
977+
936978
func BenchmarkConnectorConsumeTraces(b *testing.B) {
937979
// Prepare
938980
mcon := consumertest.NewNop()
939981

940-
conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(b), nil)
982+
conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(b), nil)
941983

942984
traces := buildSampleTrace()
943985

@@ -951,7 +993,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {
951993
func TestExcludeDimensionsConsumeTraces(t *testing.T) {
952994
mcon := consumertest.NewNop()
953995
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
954-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...)
996+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil, excludeDimensions...)
955997
traces := buildSampleTrace()
956998

957999
// Test
@@ -1000,15 +1042,16 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {
10001042

10011043
}
10021044

1003-
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
1045+
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, resourceMetricsKeyAttributes []string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
10041046

10051047
cfg := &Config{
1006-
AggregationTemporality: temporality,
1007-
Histogram: histogramConfig(),
1008-
Exemplars: exemplarsConfig(),
1009-
ExcludeDimensions: excludedDimensions,
1010-
DimensionsCacheSize: dimensionsCacheSize,
1011-
ResourceMetricsCacheSize: resourceMetricsCacheSize,
1048+
AggregationTemporality: temporality,
1049+
Histogram: histogramConfig(),
1050+
Exemplars: exemplarsConfig(),
1051+
ExcludeDimensions: excludedDimensions,
1052+
DimensionsCacheSize: dimensionsCacheSize,
1053+
ResourceMetricsCacheSize: resourceMetricsCacheSize,
1054+
ResourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
10121055
Dimensions: []Dimension{
10131056
// Set nil defaults to force a lookup for the attribute in the span.
10141057
{stringAttrName, nil},
@@ -1120,7 +1163,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
11201163
ticker := mockClock.NewTicker(time.Nanosecond)
11211164

11221165
// Note: default dimension key cache size is 2.
1123-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), ticker)
1166+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), ticker)
11241167

11251168
ctx := metadata.NewIncomingContext(context.Background(), nil)
11261169
err := p.Start(ctx, componenttest.NewNopHost())
@@ -1374,7 +1417,7 @@ func TestSpanMetrics_Events(t *testing.T) {
13741417
}
13751418
func TestExemplarsForSumMetrics(t *testing.T) {
13761419
mcon := consumertest.NewNop()
1377-
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
1420+
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
13781421
traces := buildSampleTrace()
13791422

13801423
// Test

connector/spanmetricsconnector/testdata/config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,10 @@ spanmetrics/exemplars_enabled_with_max_per_datapoint:
6868
exemplars:
6969
enabled: true
7070
max_per_data_point: 5
71+
72+
# resource metrics key attributes filter
73+
spanmetrics/resource_metrics_key_attributes:
74+
resource_metrics_key_attributes:
75+
- service.name
76+
- telemetry.sdk.language
77+
- telemetry.sdk.name

0 commit comments

Comments
 (0)