Skip to content

Commit eb97529

Browse files
committed
Add size throughput metrics
1 parent e9f3dec commit eb97529

20 files changed

+1995
-271
lines changed

.chloggen/pipeline-throughput.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add size metrics defined in Pipeline Component Telemetry RFC
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13032]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
See [Pipeline Component Telemetry RFC](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md) for more details:
20+
- `otelcol.receiver.produced.size`
21+
- `otelcol.processor.consumed.size`
22+
- `otelcol.processor.produced.size`
23+
- `otelcol.connector.consumed.size`
24+
- `otelcol.connector.produced.size`
25+
- `otelcol.exporter.consumed.size`
26+
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: []

service/internal/graph/connector.go

Lines changed: 167 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,21 @@ func (n *connectorNode) buildTraces(
8888

8989
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
9090
for _, next := range nexts {
91-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
92-
next.(consumer.Traces),
93-
tb.ConnectorProducedItems,
91+
producedOpts := []obsconsumer.Option{
92+
obsconsumer.WithTracesItemCounter(&tb.ConnectorProducedItems),
9493
obsconsumer.WithStaticDataPointAttribute(
9594
otelattr.String(
9695
pipelineIDAttrKey,
9796
next.(*capabilitiesNode).pipelineID.String(),
9897
),
9998
),
99+
}
100+
if isEnabled(tb.ConnectorProducedSize) {
101+
producedOpts = append(producedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorProducedSize))
102+
}
103+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
104+
next.(consumer.Traces),
105+
producedOpts...,
100106
)
101107
}
102108
next := connector.NewTracesRouter(consumers)
@@ -107,32 +113,61 @@ func (n *connectorNode) buildTraces(
107113
if err != nil {
108114
return err
109115
}
116+
117+
consumedOpts := []obsconsumer.Option{
118+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
119+
}
120+
if isEnabled(tb.ConnectorConsumedSize) {
121+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
122+
}
123+
110124
// Connectors which might pass along data must inherit capabilities of all nexts
111125
n.consumer = obsconsumer.NewTraces(
112126
capabilityconsumer.NewTraces(
113127
n.Component.(consumer.Traces),
114128
aggregateCap(n.Component.(consumer.Traces), nexts),
115129
),
116-
tb.ConnectorConsumedItems,
130+
consumedOpts...,
117131
)
118132
case pipeline.SignalMetrics:
119133
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
120134
if err != nil {
121135
return err
122136
}
123-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
137+
138+
consumedOpts := []obsconsumer.Option{
139+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
140+
}
141+
if isEnabled(tb.ConnectorConsumedSize) {
142+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
143+
}
144+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
124145
case pipeline.SignalLogs:
125146
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
126147
if err != nil {
127148
return err
128149
}
129-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
150+
151+
consumedOpts := []obsconsumer.Option{
152+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
153+
}
154+
if isEnabled(tb.ConnectorConsumedSize) {
155+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
156+
}
157+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
130158
case xpipeline.SignalProfiles:
131159
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
132160
if err != nil {
133161
return err
134162
}
135-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
163+
164+
consumedOpts := []obsconsumer.Option{
165+
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
166+
}
167+
if isEnabled(tb.ConnectorConsumedSize) {
168+
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
169+
}
170+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
136171
}
137172
return nil
138173
}
@@ -150,15 +185,21 @@ func (n *connectorNode) buildMetrics(
150185

151186
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
152187
for _, next := range nexts {
153-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
154-
next.(consumer.Metrics),
155-
tb.ConnectorProducedItems,
188+
producedOpts := []obsconsumer.Option{
189+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorProducedItems),
156190
obsconsumer.WithStaticDataPointAttribute(
157191
otelattr.String(
158192
pipelineIDAttrKey,
159193
next.(*capabilitiesNode).pipelineID.String(),
160194
),
161195
),
196+
}
197+
if isEnabled(tb.ConnectorProducedSize) {
198+
producedOpts = append(producedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorProducedSize))
199+
}
200+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
201+
next.(consumer.Metrics),
202+
producedOpts...,
162203
)
163204
}
164205
next := connector.NewMetricsRouter(consumers)
@@ -169,32 +210,61 @@ func (n *connectorNode) buildMetrics(
169210
if err != nil {
170211
return err
171212
}
213+
214+
consumedOpts := []obsconsumer.Option{
215+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
216+
}
217+
if isEnabled(tb.ConnectorConsumedSize) {
218+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
219+
}
220+
172221
// Connectors which might pass along data must inherit capabilities of all nexts
173222
n.consumer = obsconsumer.NewMetrics(
174223
capabilityconsumer.NewMetrics(
175224
n.Component.(consumer.Metrics),
176225
aggregateCap(n.Component.(consumer.Metrics), nexts),
177226
),
178-
tb.ConnectorConsumedItems,
227+
consumedOpts...,
179228
)
180229
case pipeline.SignalTraces:
181230
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
182231
if err != nil {
183232
return err
184233
}
185-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
234+
235+
consumedOpts := []obsconsumer.Option{
236+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
237+
}
238+
if isEnabled(tb.ConnectorConsumedSize) {
239+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
240+
}
241+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
186242
case pipeline.SignalLogs:
187243
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
188244
if err != nil {
189245
return err
190246
}
191-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
247+
248+
consumedOpts := []obsconsumer.Option{
249+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
250+
}
251+
if isEnabled(tb.ConnectorConsumedSize) {
252+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
253+
}
254+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
192255
case xpipeline.SignalProfiles:
193256
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
194257
if err != nil {
195258
return err
196259
}
197-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
260+
261+
consumedOpts := []obsconsumer.Option{
262+
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
263+
}
264+
if isEnabled(tb.ConnectorConsumedSize) {
265+
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
266+
}
267+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
198268
}
199269
return nil
200270
}
@@ -212,16 +282,20 @@ func (n *connectorNode) buildLogs(
212282

213283
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
214284
for _, next := range nexts {
215-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
216-
next.(consumer.Logs),
217-
tb.ConnectorProducedItems,
285+
producedOpts := []obsconsumer.Option{
286+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize),
218287
obsconsumer.WithStaticDataPointAttribute(
219288
otelattr.String(
220289
pipelineIDAttrKey,
221290
next.(*capabilitiesNode).pipelineID.String(),
222291
),
223292
),
224-
)
293+
}
294+
if isEnabled(tb.ConnectorProducedSize) {
295+
producedOpts = append(producedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorProducedSize))
296+
}
297+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
298+
next.(consumer.Logs), producedOpts...)
225299
}
226300
next := connector.NewLogsRouter(consumers)
227301

@@ -231,32 +305,61 @@ func (n *connectorNode) buildLogs(
231305
if err != nil {
232306
return err
233307
}
308+
309+
consumedOpts := []obsconsumer.Option{
310+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
311+
}
312+
if isEnabled(tb.ConnectorConsumedSize) {
313+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
314+
}
315+
234316
// Connectors which might pass along data must inherit capabilities of all nexts
235317
n.consumer = obsconsumer.NewLogs(
236318
capabilityconsumer.NewLogs(
237319
n.Component.(consumer.Logs),
238320
aggregateCap(n.Component.(consumer.Logs), nexts),
239321
),
240-
tb.ConnectorConsumedItems,
322+
consumedOpts...,
241323
)
242324
case pipeline.SignalTraces:
243325
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
244326
if err != nil {
245327
return err
246328
}
247-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
329+
330+
consumedOpts := []obsconsumer.Option{
331+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
332+
}
333+
if isEnabled(tb.ConnectorConsumedSize) {
334+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
335+
}
336+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
248337
case pipeline.SignalMetrics:
249338
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
250339
if err != nil {
251340
return err
252341
}
253-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
342+
343+
consumedOpts := []obsconsumer.Option{
344+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
345+
}
346+
if isEnabled(tb.ConnectorConsumedSize) {
347+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
348+
}
349+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
254350
case xpipeline.SignalProfiles:
255351
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
256352
if err != nil {
257353
return err
258354
}
259-
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
355+
356+
consumedOpts := []obsconsumer.Option{
357+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
358+
}
359+
if isEnabled(tb.ConnectorConsumedSize) {
360+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
361+
}
362+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), consumedOpts...)
260363
}
261364
return nil
262365
}
@@ -274,15 +377,21 @@ func (n *connectorNode) buildProfiles(
274377

275378
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
276379
for _, next := range nexts {
277-
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
278-
next.(xconsumer.Profiles),
279-
tb.ConnectorProducedItems,
380+
producedOpts := []obsconsumer.Option{
381+
obsconsumer.WithProfilesItemCounter(&tb.ConnectorProducedItems),
280382
obsconsumer.WithStaticDataPointAttribute(
281383
otelattr.String(
282384
pipelineIDAttrKey,
283385
next.(*capabilitiesNode).pipelineID.String(),
284386
),
285387
),
388+
}
389+
if isEnabled(tb.ConnectorProducedSize) {
390+
producedOpts = append(producedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorProducedSize))
391+
}
392+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
393+
next.(xconsumer.Profiles),
394+
producedOpts...,
286395
)
287396
}
288397
next := xconnector.NewProfilesRouter(consumers)
@@ -293,32 +402,61 @@ func (n *connectorNode) buildProfiles(
293402
if err != nil {
294403
return err
295404
}
405+
406+
consumedOpts := []obsconsumer.Option{
407+
obsconsumer.WithProfilesItemCounter(&tb.ConnectorConsumedItems),
408+
}
409+
if isEnabled(tb.ConnectorConsumedSize) {
410+
consumedOpts = append(consumedOpts, obsconsumer.WithProfilesSizeCounter(&tb.ConnectorConsumedSize))
411+
}
412+
296413
// Connectors which might pass along data must inherit capabilities of all nexts
297414
n.consumer = obsconsumer.NewProfiles(
298415
capabilityconsumer.NewProfiles(
299416
n.Component.(xconsumer.Profiles),
300417
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
301418
),
302-
tb.ConnectorConsumedItems,
419+
consumedOpts...,
303420
)
304421
case pipeline.SignalTraces:
305422
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
306423
if err != nil {
307424
return err
308425
}
309-
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
426+
427+
consumedOpts := []obsconsumer.Option{
428+
obsconsumer.WithTracesItemCounter(&tb.ConnectorConsumedItems),
429+
}
430+
if isEnabled(tb.ConnectorConsumedSize) {
431+
consumedOpts = append(consumedOpts, obsconsumer.WithTracesSizeCounter(&tb.ConnectorConsumedSize))
432+
}
433+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), consumedOpts...)
310434
case pipeline.SignalMetrics:
311435
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
312436
if err != nil {
313437
return err
314438
}
315-
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
439+
440+
consumedOpts := []obsconsumer.Option{
441+
obsconsumer.WithMetricsItemCounter(&tb.ConnectorConsumedItems),
442+
}
443+
if isEnabled(tb.ConnectorConsumedSize) {
444+
consumedOpts = append(consumedOpts, obsconsumer.WithMetricsSizeCounter(&tb.ConnectorConsumedSize))
445+
}
446+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), consumedOpts...)
316447
case pipeline.SignalLogs:
317448
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
318449
if err != nil {
319450
return err
320451
}
321-
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
452+
453+
consumedOpts := []obsconsumer.Option{
454+
obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedItems),
455+
}
456+
if isEnabled(tb.ConnectorConsumedSize) {
457+
consumedOpts = append(consumedOpts, obsconsumer.WithLogsSizeCounter(&tb.ConnectorConsumedSize))
458+
}
459+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), consumedOpts...)
322460
}
323461
return nil
324462
}

0 commit comments

Comments
 (0)