@@ -15,6 +15,8 @@ import (
15
15
"go.opentelemetry.io/collector/consumer"
16
16
"go.opentelemetry.io/collector/consumer/consumererror"
17
17
"go.opentelemetry.io/collector/receiver"
18
+ "go.opentelemetry.io/otel/attribute"
19
+ "go.opentelemetry.io/otel/metric"
18
20
"go.uber.org/zap"
19
21
20
22
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata"
@@ -38,6 +40,10 @@ const (
38
40
flowControlStateControlled
39
41
)
40
42
43
+ const (
44
+ brokerComponenteNameAttr = "receiver_name"
45
+ )
46
+
41
47
// solaceTracesReceiver uses azure AMQP to consume and handle telemetry data from SOlace. Implements receiver.Traces
42
48
type solaceTracesReceiver struct {
43
49
// config is the receiver.Config instance used to build the receiver
@@ -56,6 +62,8 @@ type solaceTracesReceiver struct {
56
62
terminating * atomic.Bool
57
63
// retryTimeout is the timeout between connection attempts
58
64
retryTimeout time.Duration
65
+ // Other Attributes including the ID of the receiver Solace broker's component name
66
+ metricAttrs attribute.Set
59
67
}
60
68
61
69
// newTracesReceiver creates a new solaceTraceReceiver as a receiver.Traces
@@ -73,7 +81,18 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
73
81
return nil , err
74
82
}
75
83
76
- unmarshaller := newTracesUnmarshaller (set .Logger , telemetryBuilder )
84
+ // solaceBrokerAttrs - including the component name of the connected Solace broker
85
+ receiverName := set .ID .Name ()
86
+ if receiverName != "" {
87
+ receiverName = "solace/" + receiverName
88
+ } else {
89
+ receiverName = "solace"
90
+ }
91
+ solaceBrokerAttrs := attribute .NewSet (
92
+ attribute .String (brokerComponenteNameAttr , receiverName ),
93
+ )
94
+
95
+ unmarshaller := newTracesUnmarshaller (set .Logger , telemetryBuilder , solaceBrokerAttrs )
77
96
78
97
return & solaceTracesReceiver {
79
98
config : config ,
@@ -85,13 +104,15 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
85
104
factory : factory ,
86
105
retryTimeout : 1 * time .Second ,
87
106
terminating : & atomic.Bool {},
107
+ metricAttrs : solaceBrokerAttrs ,
88
108
}, nil
89
109
}
90
110
91
111
// Start implements component.Receiver::Start
92
112
func (s * solaceTracesReceiver ) Start (ctx context.Context , _ component.Host ) error {
93
- s .telemetryBuilder .SolacereceiverReceiverStatus .Record (ctx , int64 (receiverStateStarting ))
94
- s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateClear ))
113
+ // set the component name for the connected Solace broker
114
+ s .telemetryBuilder .SolacereceiverReceiverStatus .Record (ctx , int64 (receiverStateStarting ), metric .WithAttributeSet (s .metricAttrs ))
115
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateClear ), metric .WithAttributeSet (s .metricAttrs ))
95
116
var cancelableContext context.Context
96
117
cancelableContext , s .cancel = context .WithCancel (context .Background ())
97
118
@@ -108,13 +129,14 @@ func (s *solaceTracesReceiver) Shutdown(_ context.Context) error {
108
129
if s .cancel == nil {
109
130
return nil
110
131
}
132
+ // set the component name for the connected Solace broker
111
133
s .terminating .Store (true )
112
- s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateTerminating ))
134
+ s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateTerminating ), metric . WithAttributeSet ( s . metricAttrs ) )
113
135
s .settings .Logger .Info ("Shutdown waiting for all components to complete" )
114
136
s .cancel () // cancels the context passed to the reconnection loop
115
137
s .shutdownWaitGroup .Wait ()
116
138
s .settings .Logger .Info ("Receiver shutdown successfully" )
117
- s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateTerminated ))
139
+ s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateTerminated ), metric . WithAttributeSet ( s . metricAttrs ) )
118
140
return nil
119
141
}
120
142
@@ -130,7 +152,7 @@ func (s *solaceTracesReceiver) connectAndReceive(ctx context.Context) {
130
152
disable := false
131
153
132
154
// indicate we are in connecting state at the start
133
- s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateConnecting ))
155
+ s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (receiverStateConnecting ), metric . WithAttributeSet ( s . metricAttrs ) )
134
156
135
157
reconnectionLoop:
136
158
for ! disable {
@@ -156,7 +178,7 @@ reconnectionLoop:
156
178
157
179
if err := service .dial (ctx ); err != nil {
158
180
s .settings .Logger .Debug ("Encountered error while connecting messaging service" , zap .Error (err ))
159
- s .telemetryBuilder .SolacereceiverFailedReconnections .Add (ctx , 1 )
181
+ s .telemetryBuilder .SolacereceiverFailedReconnections .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
160
182
return
161
183
}
162
184
// dial was successful, record the connected state
@@ -165,7 +187,7 @@ reconnectionLoop:
165
187
if err := s .receiveMessages (ctx , service ); err != nil {
166
188
s .settings .Logger .Debug ("Encountered error while receiving messages" , zap .Error (err ))
167
189
if errors .Is (err , errUpgradeRequired ) {
168
- s .telemetryBuilder .SolacereceiverNeedUpgrade .Record (ctx , 1 )
190
+ s .telemetryBuilder .SolacereceiverNeedUpgrade .Record (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
169
191
disable = true
170
192
return
171
193
}
@@ -182,7 +204,7 @@ reconnectionLoop:
182
204
// this state transition were to happen, it would be short lived.
183
205
func (s * solaceTracesReceiver ) recordConnectionState (state receiverState ) {
184
206
if ! s .terminating .Load () {
185
- s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (state ))
207
+ s .telemetryBuilder .SolacereceiverReceiverStatus .Record (context .Background (), int64 (state ), metric . WithAttributeSet ( s . metricAttrs ) )
186
208
}
187
209
}
188
210
@@ -220,18 +242,18 @@ func (s *solaceTracesReceiver) receiveMessage(ctx context.Context, service messa
220
242
}
221
243
}()
222
244
// message received successfully
223
- s .telemetryBuilder .SolacereceiverReceivedSpanMessages .Add (ctx , 1 )
245
+ s .telemetryBuilder .SolacereceiverReceivedSpanMessages .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
224
246
// unmarshal the message. unmarshalling errors are not fatal unless the version is unknown
225
247
traces , unmarshalErr := s .unmarshaller .unmarshal (msg )
226
248
if unmarshalErr != nil {
227
249
s .settings .Logger .Error ("Encountered error while unmarshalling message" , zap .Error (unmarshalErr ))
228
- s .telemetryBuilder .SolacereceiverFatalUnmarshallingErrors .Add (ctx , 1 )
250
+ s .telemetryBuilder .SolacereceiverFatalUnmarshallingErrors .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
229
251
if errors .Is (unmarshalErr , errUpgradeRequired ) {
230
252
disposition = service .failed // if we don't know the version, reject the trace message since we will disable the receiver
231
253
return unmarshalErr
232
254
}
233
- s .telemetryBuilder .SolacereceiverDroppedSpanMessages .Add (ctx , 1 ) // if the error is some other unmarshalling error, we will ack the message and drop the content
234
- return nil // don't propagate error, but don't continue forwarding traces
255
+ s .telemetryBuilder .SolacereceiverDroppedSpanMessages .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) ) // if the error is some other unmarshalling error, we will ack the message and drop the content
256
+ return nil // don't propagate error, but don't continue forwarding traces
235
257
}
236
258
237
259
var flowControlCount int64
@@ -245,10 +267,10 @@ flowControlLoop:
245
267
s .settings .Logger .Info ("Encountered temporary error while forwarding traces to next receiver, will allow redelivery" , zap .Error (forwardErr ))
246
268
// handle flow control metrics
247
269
if flowControlCount == 0 {
248
- s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateControlled ))
270
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateControlled ), metric . WithAttributeSet ( s . metricAttrs ) )
249
271
}
250
272
flowControlCount ++
251
- s .telemetryBuilder .SolacereceiverReceiverFlowControlRecentRetries .Record (ctx , flowControlCount )
273
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlRecentRetries .Record (ctx , flowControlCount , metric . WithAttributeSet ( s . metricAttrs ) )
252
274
// Backpressure scenario. For now, we are only delayed retry, eventually we may need to handle this
253
275
delayTimer := time .NewTimer (s .config .Flow .DelayedRetry .Delay )
254
276
select {
@@ -261,21 +283,21 @@ flowControlLoop:
261
283
}
262
284
} else { // error is permanent, we want to accept the message and increment the number of dropped messages
263
285
s .settings .Logger .Warn ("Encountered permanent error while forwarding traces to next receiver, will swallow trace" , zap .Error (forwardErr ))
264
- s .telemetryBuilder .SolacereceiverDroppedSpanMessages .Add (ctx , 1 )
286
+ s .telemetryBuilder .SolacereceiverDroppedSpanMessages .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
265
287
break flowControlLoop
266
288
}
267
289
} else {
268
290
// no forward error
269
- s .telemetryBuilder .SolacereceiverReportedSpans .Add (ctx , int64 (traces .SpanCount ()))
291
+ s .telemetryBuilder .SolacereceiverReportedSpans .Add (ctx , int64 (traces .SpanCount ()), metric . WithAttributeSet ( s . metricAttrs ) )
270
292
break flowControlLoop
271
293
}
272
294
}
273
295
// Make sure to clear the stats no matter what, unless we were interrupted in which case we should preserve the last state
274
296
if flowControlCount != 0 {
275
- s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateClear ))
276
- s .telemetryBuilder .SolacereceiverReceiverFlowControlTotal .Add (ctx , 1 )
297
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlStatus .Record (ctx , int64 (flowControlStateClear ), metric . WithAttributeSet ( s . metricAttrs ) )
298
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlTotal .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
277
299
if flowControlCount == 1 {
278
- s .telemetryBuilder .SolacereceiverReceiverFlowControlWithSingleSuccessfulRetry .Add (ctx , 1 )
300
+ s .telemetryBuilder .SolacereceiverReceiverFlowControlWithSingleSuccessfulRetry .Add (ctx , 1 , metric . WithAttributeSet ( s . metricAttrs ) )
279
301
}
280
302
}
281
303
return nil
0 commit comments