Skip to content

Commit 27fb68c

Browse files
tombrkAkhigbeEromo
authored andcommitted
[chore] [deltatocumulative]: linear histograms (open-telemetry#36486)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Finishes work started in open-telemetry#35048 That PR only partially introduced a less complex processor architecture by only using it for Sums. Back then I was not sure of the best way to do it for multiple datatypes, as generics seemed to introduce a lot of complexity regardless of usage. I since then did of a lot of perf analysis and due to the way Go works (see gcshapes), we do not really gain anything at runtime from using generics, given method calls are still dynamic. This implementation uses regular Go interfaces and a good old type switch in the hot path (ConsumeMetrics), which lowers mental complexity quite a lot imo. The value of the new architecture is backed up by the following benchmark: ``` goos: linux goarch: arm64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor │ sums.nested │ sums.linear │ │ sec/op │ sec/op vs base │ Processor/sums-8 56.35µ ± 1% 39.99µ ± 1% -29.04% (p=0.000 n=10) │ sums.nested │ sums.linear │ │ B/op │ B/op vs base │ Processor/sums-8 11.520Ki ± 0% 3.683Ki ± 0% -68.03% (p=0.000 n=10) │ sums.nested │ sums.linear │ │ allocs/op │ allocs/op vs base │ Processor/sums-8 365.0 ± 0% 260.0 ± 0% -28.77% (p=0.000 n=10) ``` <!--Describe what testing was performed and which tests were added.--> #### Testing This is a refactor, existing tests pass unaltered. <!--Describe the documentation added.--> #### Documentation not needed <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 7f72ef7 commit 27fb68c

File tree

9 files changed

+326
-84
lines changed

9 files changed

+326
-84
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package deltatocumulativeprocessor
5+
6+
import (
7+
"context"
8+
"math/rand/v2"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/collector/consumer/consumertest"
15+
"go.opentelemetry.io/collector/pdata/pcommon"
16+
"go.opentelemetry.io/collector/pdata/pmetric"
17+
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo"
21+
)
22+
23+
var out *consumertest.MetricsSink
24+
25+
func BenchmarkProcessor(gb *testing.B) {
26+
const (
27+
metrics = 5
28+
streams = 10
29+
)
30+
31+
now := time.Now()
32+
start := pcommon.NewTimestampFromTime(now)
33+
ts := pcommon.NewTimestampFromTime(now.Add(time.Minute))
34+
35+
type Case struct {
36+
name string
37+
fill func(m pmetric.Metric)
38+
next func(m pmetric.Metric)
39+
}
40+
cases := []Case{{
41+
name: "sums",
42+
fill: func(m pmetric.Metric) {
43+
sum := m.SetEmptySum()
44+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
45+
for i := range streams {
46+
dp := sum.DataPoints().AppendEmpty()
47+
dp.SetIntValue(int64(rand.IntN(10)))
48+
dp.Attributes().PutStr("idx", strconv.Itoa(i))
49+
dp.SetStartTimestamp(start)
50+
dp.SetTimestamp(ts)
51+
}
52+
},
53+
next: func(m pmetric.Metric) {
54+
dps := m.Sum().DataPoints()
55+
for i := range dps.Len() {
56+
dp := dps.At(i)
57+
dp.SetStartTimestamp(dp.Timestamp())
58+
dp.SetTimestamp(pcommon.NewTimestampFromTime(
59+
dp.Timestamp().AsTime().Add(time.Minute),
60+
))
61+
}
62+
},
63+
}, {
64+
name: "histogram",
65+
fill: func(m pmetric.Metric) {
66+
hist := m.SetEmptyHistogram()
67+
hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
68+
for i := range streams {
69+
dp := hist.DataPoints().AppendEmpty()
70+
histo.DefaultBounds.Observe(
71+
float64(rand.IntN(1000)),
72+
float64(rand.IntN(1000)),
73+
float64(rand.IntN(1000)),
74+
float64(rand.IntN(1000)),
75+
).CopyTo(dp)
76+
77+
dp.SetStartTimestamp(start)
78+
dp.SetTimestamp(ts)
79+
dp.Attributes().PutStr("idx", strconv.Itoa(i))
80+
}
81+
},
82+
next: func(m pmetric.Metric) {
83+
dps := m.Histogram().DataPoints()
84+
for i := range dps.Len() {
85+
dp := dps.At(i)
86+
dp.SetStartTimestamp(dp.Timestamp())
87+
dp.SetTimestamp(pcommon.NewTimestampFromTime(
88+
dp.Timestamp().AsTime().Add(time.Minute),
89+
))
90+
}
91+
},
92+
}, {
93+
name: "exponential",
94+
fill: func(m pmetric.Metric) {
95+
ex := m.SetEmptyExponentialHistogram()
96+
ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
97+
for i := range streams {
98+
dp := ex.DataPoints().AppendEmpty()
99+
o := expotest.Observe(expo.Scale(2),
100+
float64(rand.IntN(31)+1),
101+
float64(rand.IntN(31)+1),
102+
float64(rand.IntN(31)+1),
103+
float64(rand.IntN(31)+1),
104+
)
105+
o.CopyTo(dp.Positive())
106+
o.CopyTo(dp.Negative())
107+
108+
dp.SetStartTimestamp(start)
109+
dp.SetTimestamp(ts)
110+
dp.Attributes().PutStr("idx", strconv.Itoa(i))
111+
}
112+
},
113+
next: func(m pmetric.Metric) {
114+
dps := m.ExponentialHistogram().DataPoints()
115+
for i := range dps.Len() {
116+
dp := dps.At(i)
117+
dp.SetStartTimestamp(dp.Timestamp())
118+
dp.SetTimestamp(pcommon.NewTimestampFromTime(
119+
dp.Timestamp().AsTime().Add(time.Minute),
120+
))
121+
}
122+
},
123+
}}
124+
125+
for _, cs := range cases {
126+
gb.Run(cs.name, func(b *testing.B) {
127+
st := setup(b, nil)
128+
out = st.sink
129+
130+
md := pmetric.NewMetrics()
131+
ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics()
132+
for i := range metrics {
133+
m := ms.AppendEmpty()
134+
m.SetName(strconv.Itoa(i))
135+
cs.fill(m)
136+
}
137+
138+
b.ReportAllocs()
139+
b.ResetTimer()
140+
b.StopTimer()
141+
142+
ctx := context.Background()
143+
for range b.N {
144+
for i := range ms.Len() {
145+
cs.next(ms.At(i))
146+
}
147+
req := pmetric.NewMetrics()
148+
md.CopyTo(req)
149+
150+
b.StartTimer()
151+
err := st.proc.ConsumeMetrics(ctx, req)
152+
b.StopTimer()
153+
require.NoError(b, err)
154+
}
155+
156+
// verify all dps are processed without error
157+
b.StopTimer()
158+
require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount())
159+
})
160+
}
161+
}

processor/deltatocumulativeprocessor/internal/delta/delta.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88

99
"go.opentelemetry.io/collector/pdata/pcommon"
10+
"go.opentelemetry.io/collector/pdata/pmetric"
1011

1112
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
1213
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
@@ -83,10 +84,17 @@ func (e ErrGap) Error() string {
8384
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
8485
}
8586

87+
type Type interface {
88+
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint
89+
90+
StartTimestamp() pcommon.Timestamp
91+
Timestamp() pcommon.Timestamp
92+
}
93+
8694
// AccumulateInto adds state and dp, storing the result in state
8795
//
8896
// state = state + dp
89-
func AccumulateInto[P data.Point[P]](state P, dp P) error {
97+
func AccumulateInto[T Type](state, dp T) error {
9098
switch {
9199
case dp.StartTimestamp() < state.StartTimestamp():
92100
// belongs to older series
@@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error {
96104
return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()}
97105
}
98106

99-
state.Add(dp)
107+
switch dp := any(dp).(type) {
108+
case pmetric.NumberDataPoint:
109+
state := any(state).(pmetric.NumberDataPoint)
110+
data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp})
111+
case pmetric.HistogramDataPoint:
112+
state := any(state).(pmetric.HistogramDataPoint)
113+
data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp})
114+
case pmetric.ExponentialHistogramDataPoint:
115+
state := any(state).(pmetric.ExponentialHistogramDataPoint)
116+
data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp})
117+
}
100118
return nil
101119
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
5+
6+
import "go.opentelemetry.io/otel/attribute"
7+
8+
type Attributes []attribute.KeyValue
9+
10+
func (a *Attributes) Set(attr attribute.KeyValue) {
11+
*a = append(*a, attr)
12+
}

processor/deltatocumulativeprocessor/internal/metrics/data.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func (s Gauge) Filter(expr func(data.Number) bool) {
115115
return !expr(data.Number{NumberDataPoint: dp})
116116
})
117117
}
118+
func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {}
118119

119120
type Summary Metric
120121

@@ -136,3 +137,4 @@ func (s Summary) Filter(expr func(data.Summary) bool) {
136137
return !expr(data.Summary{SummaryDataPoint: dp})
137138
})
138139
}
140+
func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {}

processor/deltatocumulativeprocessor/internal/metrics/metrics.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality {
4747
return pmetric.AggregationTemporalityUnspecified
4848
}
4949

50-
func (m Metric) Typed() any {
50+
func (m Metric) Typed() Any {
5151
//exhaustive:enforce
5252
switch m.Type() {
5353
case pmetric.MetricTypeSum:
@@ -63,3 +63,49 @@ func (m Metric) Typed() any {
6363
}
6464
panic("unreachable")
6565
}
66+
67+
var (
68+
_ Any = Sum{}
69+
_ Any = Gauge{}
70+
_ Any = ExpHistogram{}
71+
_ Any = Histogram{}
72+
_ Any = Summary{}
73+
)
74+
75+
type Any interface {
76+
Len() int
77+
Ident() identity.Metric
78+
79+
SetAggregationTemporality(pmetric.AggregationTemporality)
80+
}
81+
82+
func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) {
83+
mid := m.Ident()
84+
switch m.Type() {
85+
case pmetric.MetricTypeSum:
86+
m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
87+
id := identity.OfStream(mid, dp)
88+
return !ok(id, dp)
89+
})
90+
case pmetric.MetricTypeGauge:
91+
m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
92+
id := identity.OfStream(mid, dp)
93+
return !ok(id, dp)
94+
})
95+
case pmetric.MetricTypeHistogram:
96+
m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
97+
id := identity.OfStream(mid, dp)
98+
return !ok(id, dp)
99+
})
100+
case pmetric.MetricTypeExponentialHistogram:
101+
m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
102+
id := identity.OfStream(mid, dp)
103+
return !ok(id, dp)
104+
})
105+
case pmetric.MetricTypeSummary:
106+
m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
107+
id := identity.OfStream(mid, dp)
108+
return !ok(id, dp)
109+
})
110+
}
111+
}

0 commit comments

Comments
 (0)