Skip to content

Commit 57af3c5

Browse files
committed
Add size throughput metrics
1 parent e9f3dec commit 57af3c5

19 files changed

+1551
-196
lines changed

.chloggen/pipeline-throughput.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add size metrics defined in Pipeline Component Telemetry RFC
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13032]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
See [Pipeline Component Telemetry RFC](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md) for more details:
20+
- `otelcol.receiver.produced.size`
21+
- `otelcol.processor.consumed.size`
22+
- `otelcol.processor.produced.size`
23+
- `otelcol.connector.consumed.size`
24+
- `otelcol.connector.produced.size`
25+
- `otelcol.exporter.consumed.size`
26+
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: []

service/internal/graph/connector.go

Lines changed: 125 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,21 @@ func (n *connectorNode) buildTraces(
8888

8989
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
9090
for _, next := range nexts {
91-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
92-
next.(consumer.Traces),
93-
tb.ConnectorProducedItems,
91+
producedOpts := []obsconsumer.Option{
92+
obsconsumer.WithTracesItemCounter(&tb.ConnectorProducedItems),
9493
obsconsumer.WithStaticDataPointAttribute(
9594
otelattr.String(
9695
pipelineIDAttrKey,
9796
next.(*capabilitiesNode).pipelineID.String(),
9897
),
9998
),
99+
}
100+
if isEnabled(tb.ConnectorProducedSize) {
101+
producedOpts = append(producedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorProducedSize))
102+
}
103+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
104+
next.(consumer.Traces),
105+
producedOpts...,
100106
)
101107
}
102108
next := connector.NewTracesRouter(consumers)
@@ -107,26 +113,48 @@ func (n *connectorNode) buildTraces(
107113
if err != nil {
108114
return err
109115
}
116+
117+
consumedOpts := []obsconsumer.Option{
118+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
119+
}
120+
if isEnabled(tb.ConnectorConsumedSize) {
121+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
122+
}
123+
110124
// Connectors which might pass along data must inherit capabilities of all nexts
111125
n.consumer = obsconsumer.NewTraces(
112126
capabilityconsumer.NewTraces(
113127
n.Component.(consumer.Traces),
114128
aggregateCap(n.Component.(consumer.Traces), nexts),
115129
),
116-
tb.ConnectorConsumedItems,
130+
consumedOpts...,
117131
)
118132
case pipeline.SignalMetrics:
119133
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
120134
if err != nil {
121135
return err
122136
}
123-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
137+
138+
consumedOpts := []obsconsumer.Option{
139+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
140+
}
141+
if isEnabled(tb.ConnectorConsumedSize) {
142+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
143+
}
144+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
124145
case pipeline.SignalLogs:
125146
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
126147
if err != nil {
127148
return err
128149
}
129-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
150+
151+
consumedOpts := []obsconsumer.Option{
152+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
153+
}
154+
if isEnabled(tb.ConnectorConsumedSize) {
155+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
156+
}
157+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
130158
case xpipeline.SignalProfiles:
131159
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
132160
if err != nil {
@@ -150,15 +178,21 @@ func (n *connectorNode) buildMetrics(
150178

151179
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
152180
for _, next := range nexts {
153-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
154-
next.(consumer.Metrics),
155-
tb.ConnectorProducedItems,
181+
producedOpts := []obsconsumer.Option{
182+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorProducedItems),
156183
obsconsumer.WithStaticDataPointAttribute(
157184
otelattr.String(
158185
pipelineIDAttrKey,
159186
next.(*capabilitiesNode).pipelineID.String(),
160187
),
161188
),
189+
}
190+
if isEnabled(tb.ConnectorProducedSize) {
191+
producedOpts = append(producedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorProducedSize))
192+
}
193+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
194+
next.(consumer.Metrics),
195+
producedOpts...,
162196
)
163197
}
164198
next := connector.NewMetricsRouter(consumers)
@@ -169,26 +203,48 @@ func (n *connectorNode) buildMetrics(
169203
if err != nil {
170204
return err
171205
}
206+
207+
consumedOpts := []obsconsumer.Option{
208+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
209+
}
210+
if isEnabled(tb.ConnectorConsumedSize) {
211+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
212+
}
213+
172214
// Connectors which might pass along data must inherit capabilities of all nexts
173215
n.consumer = obsconsumer.NewMetrics(
174216
capabilityconsumer.NewMetrics(
175217
n.Component.(consumer.Metrics),
176218
aggregateCap(n.Component.(consumer.Metrics), nexts),
177219
),
178-
tb.ConnectorConsumedItems,
220+
consumedOpts...,
179221
)
180222
case pipeline.SignalTraces:
181223
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
182224
if err != nil {
183225
return err
184226
}
185-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
227+
228+
consumedOpts := []obsconsumer.Option{
229+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
230+
}
231+
if isEnabled(tb.ConnectorConsumedSize) {
232+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
233+
}
234+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
186235
case pipeline.SignalLogs:
187236
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
188237
if err != nil {
189238
return err
190239
}
191-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
240+
241+
consumedOpts := []obsconsumer.Option{
242+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
243+
}
244+
if isEnabled(tb.ConnectorConsumedSize) {
245+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
246+
}
247+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
192248
case xpipeline.SignalProfiles:
193249
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
194250
if err != nil {
@@ -212,16 +268,20 @@ func (n *connectorNode) buildLogs(
212268

213269
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
214270
for _, next := range nexts {
215-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
216-
next.(consumer.Logs),
217-
tb.ConnectorProducedItems,
271+
producedOpts := []obsconsumer.Option{
272+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize),
218273
obsconsumer.WithStaticDataPointAttribute(
219274
otelattr.String(
220275
pipelineIDAttrKey,
221276
next.(*capabilitiesNode).pipelineID.String(),
222277
),
223278
),
224-
)
279+
}
280+
if isEnabled(tb.ConnectorProducedSize) {
281+
producedOpts = append(producedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize))
282+
}
283+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
284+
next.(consumer.Logs), producedOpts...)
225285
}
226286
next := connector.NewLogsRouter(consumers)
227287

@@ -231,26 +291,48 @@ func (n *connectorNode) buildLogs(
231291
if err != nil {
232292
return err
233293
}
294+
295+
consumedOpts := []obsconsumer.Option{
296+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
297+
}
298+
if isEnabled(tb.ConnectorConsumedSize) {
299+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
300+
}
301+
234302
// Connectors which might pass along data must inherit capabilities of all nexts
235303
n.consumer = obsconsumer.NewLogs(
236304
capabilityconsumer.NewLogs(
237305
n.Component.(consumer.Logs),
238306
aggregateCap(n.Component.(consumer.Logs), nexts),
239307
),
240-
tb.ConnectorConsumedItems,
308+
consumedOpts...,
241309
)
242310
case pipeline.SignalTraces:
243311
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
244312
if err != nil {
245313
return err
246314
}
247-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
315+
316+
consumedOpts := []obsconsumer.Option{
317+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
318+
}
319+
if isEnabled(tb.ConnectorConsumedSize) {
320+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
321+
}
322+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
248323
case pipeline.SignalMetrics:
249324
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
250325
if err != nil {
251326
return err
252327
}
253-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
328+
329+
consumedOpts := []obsconsumer.Option{
330+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
331+
}
332+
if isEnabled(tb.ConnectorConsumedSize) {
333+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
334+
}
335+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
254336
case xpipeline.SignalProfiles:
255337
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
256338
if err != nil {
@@ -306,19 +388,40 @@ func (n *connectorNode) buildProfiles(
306388
if err != nil {
307389
return err
308390
}
309-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
391+
392+
consumedOpts := []obsconsumer.Option{
393+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
394+
}
395+
if isEnabled(tb.ConnectorConsumedSize) {
396+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
397+
}
398+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
310399
case pipeline.SignalMetrics:
311400
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
312401
if err != nil {
313402
return err
314403
}
315-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
404+
405+
consumedOpts := []obsconsumer.Option{
406+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
407+
}
408+
if isEnabled(tb.ConnectorConsumedSize) {
409+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
410+
}
411+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
316412
case pipeline.SignalLogs:
317413
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
318414
if err != nil {
319415
return err
320416
}
321-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
417+
418+
consumedOpts := []obsconsumer.Option{
419+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
420+
}
421+
if isEnabled(tb.ConnectorConsumedSize) {
422+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
423+
}
424+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
322425
}
323426
return nil
324427
}

service/internal/graph/enabled.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/otel/metric"
10+
)
11+
12+
type enabledInstrument interface {
13+
Enabled(context.Context) bool
14+
}
15+
16+
func isEnabled(inst metric.Int64Counter) bool {
17+
_, ok := inst.(enabledInstrument)
18+
return ok
19+
}

service/internal/graph/exporter.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,40 @@ func (n *exporterNode) buildComponent(
6767
if err != nil {
6868
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
6969
}
70-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ExporterConsumedItems)
70+
71+
consumedOpts := []obsconsumer.Option{
72+
obsconsumer.WithTracesItemCounter(&tb.ExporterConsumedItems),
73+
}
74+
if isEnabled(tb.ExporterConsumedSize) {
75+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ExporterConsumedSize))
76+
}
77+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
7178
case pipeline.SignalMetrics:
7279
n.Component, err = builder.CreateMetrics(ctx, set)
7380
if err != nil {
7481
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
7582
}
76-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ExporterConsumedItems)
83+
84+
consumedOpts := []obsconsumer.Option{
85+
obsconsumer.WithMetricsItemCounter(&tb.ExporterConsumedItems),
86+
}
87+
if isEnabled(tb.ExporterConsumedSize) {
88+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ExporterConsumedSize))
89+
}
90+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
7791
case pipeline.SignalLogs:
7892
n.Component, err = builder.CreateLogs(ctx, set)
7993
if err != nil {
8094
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
8195
}
82-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ExporterConsumedItems)
96+
97+
consumedOpts := []obsconsumer.Option{
98+
obsconsumer.WithLogsSizeCounter(&tb.ExporterConsumedItems),
99+
}
100+
if isEnabled(tb.ExporterConsumedSize) {
101+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ExporterConsumedSize))
102+
}
103+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
83104
case xpipeline.SignalProfiles:
84105
n.Component, err = builder.CreateProfiles(ctx, set)
85106
if err != nil {

0 commit comments

Comments
 (0)