Skip to content

Commit 178caa1

Browse files
authored
[service/internal/graph] Add size throughput metrics (#13032)
Follows #12812 This PR adds the `size` metrics defined in the [Pipeline Component Telemetry RFC](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md).
1 parent 4a5e154 commit 178caa1

22 files changed

+1452
-290
lines changed

.chloggen/pipeline-throughput.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add size metrics defined in Pipeline Component Telemetry RFC
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13032]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
See [Pipeline Component Telemetry RFC](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md) for more details:
20+
- `otelcol.receiver.produced.size`
21+
- `otelcol.processor.consumed.size`
22+
- `otelcol.processor.produced.size`
23+
- `otelcol.connector.consumed.size`
24+
- `otelcol.connector.produced.size`
25+
- `otelcol.exporter.consumed.size`
26+
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: []

service/internal/graph/connector.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (n *connectorNode) buildTraces(
9090
for _, next := range nexts {
9191
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
9292
next.(consumer.Traces),
93-
tb.ConnectorProducedItems,
93+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
9494
obsconsumer.WithStaticDataPointAttribute(
9595
otelattr.String(
9696
pipelineIDAttrKey,
@@ -107,32 +107,33 @@ func (n *connectorNode) buildTraces(
107107
if err != nil {
108108
return err
109109
}
110+
110111
// Connectors which might pass along data must inherit capabilities of all nexts
111112
n.consumer = obsconsumer.NewTraces(
112113
capabilityconsumer.NewTraces(
113114
n.Component.(consumer.Traces),
114115
aggregateCap(n.Component.(consumer.Traces), nexts),
115116
),
116-
tb.ConnectorConsumedItems,
117+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
117118
)
118119
case pipeline.SignalMetrics:
119120
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
120121
if err != nil {
121122
return err
122123
}
123-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
124+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
124125
case pipeline.SignalLogs:
125126
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
126127
if err != nil {
127128
return err
128129
}
129-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
130+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
130131
case xpipeline.SignalProfiles:
131132
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
132133
if err != nil {
133134
return err
134135
}
135-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
136+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
136137
}
137138
return nil
138139
}
@@ -152,7 +153,7 @@ func (n *connectorNode) buildMetrics(
152153
for _, next := range nexts {
153154
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
154155
next.(consumer.Metrics),
155-
tb.ConnectorProducedItems,
156+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
156157
obsconsumer.WithStaticDataPointAttribute(
157158
otelattr.String(
158159
pipelineIDAttrKey,
@@ -169,32 +170,33 @@ func (n *connectorNode) buildMetrics(
169170
if err != nil {
170171
return err
171172
}
173+
172174
// Connectors which might pass along data must inherit capabilities of all nexts
173175
n.consumer = obsconsumer.NewMetrics(
174176
capabilityconsumer.NewMetrics(
175177
n.Component.(consumer.Metrics),
176178
aggregateCap(n.Component.(consumer.Metrics), nexts),
177179
),
178-
tb.ConnectorConsumedItems,
180+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
179181
)
180182
case pipeline.SignalTraces:
181183
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
182184
if err != nil {
183185
return err
184186
}
185-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
187+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
186188
case pipeline.SignalLogs:
187189
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
188190
if err != nil {
189191
return err
190192
}
191-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
193+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
192194
case xpipeline.SignalProfiles:
193195
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
194196
if err != nil {
195197
return err
196198
}
197-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
199+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
198200
}
199201
return nil
200202
}
@@ -214,7 +216,7 @@ func (n *connectorNode) buildLogs(
214216
for _, next := range nexts {
215217
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
216218
next.(consumer.Logs),
217-
tb.ConnectorProducedItems,
219+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
218220
obsconsumer.WithStaticDataPointAttribute(
219221
otelattr.String(
220222
pipelineIDAttrKey,
@@ -231,32 +233,33 @@ func (n *connectorNode) buildLogs(
231233
if err != nil {
232234
return err
233235
}
236+
234237
// Connectors which might pass along data must inherit capabilities of all nexts
235238
n.consumer = obsconsumer.NewLogs(
236239
capabilityconsumer.NewLogs(
237240
n.Component.(consumer.Logs),
238241
aggregateCap(n.Component.(consumer.Logs), nexts),
239242
),
240-
tb.ConnectorConsumedItems,
243+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
241244
)
242245
case pipeline.SignalTraces:
243246
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
244247
if err != nil {
245248
return err
246249
}
247-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
250+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
248251
case pipeline.SignalMetrics:
249252
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
250253
if err != nil {
251254
return err
252255
}
253-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
256+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
254257
case xpipeline.SignalProfiles:
255258
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
256259
if err != nil {
257260
return err
258261
}
259-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
262+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
260263
}
261264
return nil
262265
}
@@ -276,7 +279,7 @@ func (n *connectorNode) buildProfiles(
276279
for _, next := range nexts {
277280
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
278281
next.(xconsumer.Profiles),
279-
tb.ConnectorProducedItems,
282+
tb.ConnectorProducedItems, tb.ConnectorProducedSize,
280283
obsconsumer.WithStaticDataPointAttribute(
281284
otelattr.String(
282285
pipelineIDAttrKey,
@@ -293,32 +296,33 @@ func (n *connectorNode) buildProfiles(
293296
if err != nil {
294297
return err
295298
}
299+
296300
// Connectors which might pass along data must inherit capabilities of all nexts
297301
n.consumer = obsconsumer.NewProfiles(
298302
capabilityconsumer.NewProfiles(
299303
n.Component.(xconsumer.Profiles),
300304
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
301305
),
302-
tb.ConnectorConsumedItems,
306+
tb.ConnectorConsumedItems, tb.ConnectorConsumedSize,
303307
)
304308
case pipeline.SignalTraces:
305309
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
306310
if err != nil {
307311
return err
308312
}
309-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
313+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
310314
case pipeline.SignalMetrics:
311315
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
312316
if err != nil {
313317
return err
314318
}
315-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
319+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
316320
case pipeline.SignalLogs:
317321
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
318322
if err != nil {
319323
return err
320324
}
321-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
325+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems, tb.ConnectorConsumedSize)
322326
}
323327
return nil
324328
}

service/internal/graph/exporter.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,25 @@ func (n *exporterNode) buildComponent(
6767
if err != nil {
6868
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
6969
}
70-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ExporterConsumedItems)
70+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ExporterConsumedItems, tb.ExporterConsumedSize)
7171
case pipeline.SignalMetrics:
7272
n.Component, err = builder.CreateMetrics(ctx, set)
7373
if err != nil {
7474
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
7575
}
76-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ExporterConsumedItems)
76+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ExporterConsumedItems, tb.ExporterConsumedSize)
7777
case pipeline.SignalLogs:
7878
n.Component, err = builder.CreateLogs(ctx, set)
7979
if err != nil {
8080
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
8181
}
82-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ExporterConsumedItems)
82+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ExporterConsumedItems, tb.ExporterConsumedSize)
8383
case xpipeline.SignalProfiles:
8484
n.Component, err = builder.CreateProfiles(ctx, set)
8585
if err != nil {
8686
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
8787
}
88-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ExporterConsumedItems)
88+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ExporterConsumedItems, tb.ExporterConsumedSize)
8989
default:
9090
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
9191
}

0 commit comments

Comments
 (0)