Skip to content

Commit 8e9b92b

Browse files
authored
[receiver/kafkareceiver] Add 'topic' attribute to kafka internal metrics (#39550)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add 'topic' attribute to kafka existing internal metrics. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #35336 <!--Describe what testing was performed and which tests were added.--> #### Testing Updated the existing `kafkareceiver_test.go` <!--Describe the documentation added.--> #### Documentation Changelog <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 219abfd commit 8e9b92b

File tree

3 files changed

+51
-3
lines changed

3 files changed

+51
-3
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Added a new 'topic' attribute to all existing internal consume-claim metrics.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35336]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
transport = "kafka"
3636
// TODO: update the following attributes to reflect semconv
3737
attrInstanceName = "name"
38+
attrTopic = "topic"
3839
attrPartition = "partition"
3940
)
4041

@@ -78,7 +79,10 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume
7879
config, set.Logger,
7980
c.telemetryBuilder.KafkaReceiverUnmarshalFailedLogRecords,
8081
[]metric.AddOption{
81-
metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())),
82+
metric.WithAttributeSet(attribute.NewSet(
83+
attribute.String(attrInstanceName, c.id.String()),
84+
attribute.String(attrTopic, config.Logs.Topic),
85+
)),
8286
},
8387
&logsHandler{
8488
unmarshaler: unmarshaler,
@@ -101,7 +105,10 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons
101105
config, set.Logger,
102106
c.telemetryBuilder.KafkaReceiverUnmarshalFailedMetricPoints,
103107
[]metric.AddOption{
104-
metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())),
108+
metric.WithAttributeSet(attribute.NewSet(
109+
attribute.String(attrInstanceName, c.id.String()),
110+
attribute.String(attrTopic, config.Metrics.Topic),
111+
)),
105112
},
106113
&metricsHandler{
107114
unmarshaler: unmarshaler,
@@ -124,7 +131,10 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
124131
config, set.Logger,
125132
c.telemetryBuilder.KafkaReceiverUnmarshalFailedSpans,
126133
[]metric.AddOption{
127-
metric.WithAttributes(attribute.String(attrInstanceName, c.id.String())),
134+
metric.WithAttributeSet(attribute.NewSet(
135+
attribute.String(attrInstanceName, c.id.String()),
136+
attribute.String(attrTopic, config.Traces.Topic),
137+
)),
128138
},
129139
&tracesHandler{
130140
unmarshaler: unmarshaler,
@@ -155,6 +165,7 @@ func newMessageHandlerConsumeFunc[T plog.Logs | pmetric.Metrics | ptrace.Traces]
155165
data, n, err = h.unmarshalData(message.Value)
156166
if err != nil {
157167
logger.Error("failed to unmarshal message", zap.Error(err))
168+
metricAddOpts = append(metricAddOpts, metric.WithAttributes(attribute.String(attrPartition, strconv.Itoa(int(message.Partition)))))
158169
unmarshalFailedCounter.Add(ctx, 1, metricAddOpts...)
159170
return err
160171
}
@@ -487,6 +498,7 @@ func (c *consumerGroupHandler) handleMessage(
487498
ctx := newContextWithHeaders(session.Context(), message.Headers)
488499
attrs := attribute.NewSet(
489500
attribute.String(attrInstanceName, c.id.String()),
501+
attribute.String(attrTopic, message.Topic),
490502
attribute.String(attrPartition, strconv.Itoa(int(claim.Partition()))),
491503
)
492504
c.telemetryBuilder.KafkaReceiverMessages.Add(ctx, 1, metric.WithAttributeSet(attrs))

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
285285
Value: 1,
286286
Attributes: attribute.NewSet(
287287
attribute.String("name", set.ID.String()),
288+
attribute.String("topic", "otlp_spans"),
289+
attribute.String("partition", "0"),
288290
),
289291
}}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
290292

@@ -301,6 +303,7 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
301303
Value: 5,
302304
Attributes: attribute.NewSet(
303305
attribute.String("name", set.ID.String()),
306+
attribute.String("topic", "otlp_spans"),
304307
attribute.String("partition", "0"),
305308
),
306309
}}, metricdatatest.IgnoreTimestamp())
@@ -327,6 +330,7 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
327330
Value: 4, // offset of the final message
328331
Attributes: attribute.NewSet(
329332
attribute.String("name", set.ID.String()),
333+
attribute.String("topic", "otlp_spans"),
330334
attribute.String("partition", "0"),
331335
),
332336
}}, metricdatatest.IgnoreTimestamp())
@@ -335,6 +339,7 @@ func TestReceiver_InternalTelemetry(t *testing.T) {
335339
Value: 0,
336340
Attributes: attribute.NewSet(
337341
attribute.String("name", set.ID.String()),
342+
attribute.String("topic", "otlp_spans"),
338343
attribute.String("partition", "0"),
339344
),
340345
}}, metricdatatest.IgnoreTimestamp())
@@ -383,6 +388,8 @@ func TestNewLogsReceiver(t *testing.T) {
383388
Value: 1,
384389
Attributes: attribute.NewSet(
385390
attribute.String("name", set.ID.String()),
391+
attribute.String("topic", "otlp_logs"),
392+
attribute.String("partition", "0"),
386393
),
387394
}}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
388395

@@ -435,6 +442,8 @@ func TestNewMetricsReceiver(t *testing.T) {
435442
Value: 1,
436443
Attributes: attribute.NewSet(
437444
attribute.String("name", set.ID.String()),
445+
attribute.String("topic", "otlp_metrics"),
446+
attribute.String("partition", "0"),
438447
),
439448
}}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
440449

0 commit comments

Comments
 (0)