Skip to content

Commit a02815c

Browse files
committed
add ut
1 parent f6aff89 commit a02815c

File tree

4 files changed

+110
-60
lines changed

4 files changed

+110
-60
lines changed

receiver/k8sobjectsreceiver/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/google/uuid v1.6.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.125.0
78
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.125.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.125.0
910
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.125.0
@@ -82,7 +83,7 @@ require (
8283
github.com/spf13/pflag v1.0.5 // indirect
8384
github.com/x448/float16 v0.8.4 // indirect
8485
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
85-
go.opentelemetry.io/collector v0.125.1-0.20250505155216-829157cef7bb // indirect
86+
go.opentelemetry.io/collector v0.125.0 // indirect
8687
go.opentelemetry.io/collector/client v1.31.1-0.20250505152726-56c7da210783 // indirect
8788
go.opentelemetry.io/collector/component/componentstatus v0.125.1-0.20250505155216-829157cef7bb // indirect
8889
go.opentelemetry.io/collector/config/configauth v0.125.1-0.20250505155216-829157cef7bb // indirect
@@ -95,6 +96,7 @@ require (
9596
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
9697
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250505155216-829157cef7bb // indirect
9798
go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250505155216-829157cef7bb // indirect
99+
go.opentelemetry.io/collector/extension v1.31.1-0.20250505152726-56c7da210783 // indirect
98100
go.opentelemetry.io/collector/extension/extensionauth v1.31.1-0.20250505152726-56c7da210783 // indirect
99101
go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.1-0.20250505155216-829157cef7bb // indirect
100102
go.opentelemetry.io/collector/featuregate v1.31.1-0.20250505152726-56c7da210783 // indirect

receiver/k8sobjectsreceiver/go.sum

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,45 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
8282
return err
8383
}
8484
kr.client = client
85+
86+
// Validate objects against K8s API
87+
validObjects, err := kr.config.getValidObjects()
88+
if err != nil {
89+
return err
90+
}
91+
92+
var validConfigs []*K8sObjectsConfig
93+
for _, object := range kr.objects {
94+
gvrs, ok := validObjects[object.Name]
95+
if !ok {
96+
availableResource := make([]string, 0, len(validObjects))
97+
for k := range validObjects {
98+
availableResource = append(availableResource, k)
99+
}
100+
err = fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource)
101+
if handlerErr := kr.handleError(err, ""); handlerErr != nil {
102+
return handlerErr
103+
}
104+
continue
105+
}
106+
107+
gvr := gvrs[0]
108+
for i := range gvrs {
109+
if gvrs[i].Group == object.Group {
110+
gvr = gvrs[i]
111+
break
112+
}
113+
}
114+
115+
object.gvr = gvr
116+
validConfigs = append(validConfigs, object)
117+
}
118+
119+
if len(validConfigs) == 0 {
120+
err = errors.New("no valid Kubernetes objects found to watch")
121+
return err
122+
}
123+
85124
cctx, cancel := context.WithCancel(ctx)
86125
kr.cancel = cancel
87126

@@ -97,12 +136,6 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
97136
return fmt.Errorf("the extension %T is not implement k8sleaderelector.LeaderElection", k8sLeaderElector)
98137
}
99138

100-
validConfigs := Validate()
101-
if len(validConfigs) == 0 {
102-
err := errors.New("no valid Kubernetes objects found to watch")
103-
return err
104-
}
105-
106139
elector.SetCallBackFuncs(
107140
func(_ context.Context) {
108141
kr.setting.Logger.Info("Object Receiver started as leader")
@@ -117,13 +150,8 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
117150
kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err))
118151
}
119152
})
120-
return nil
121-
}
122153

123-
validConfigs := Validate()
124-
if len(validConfigs) == 0 {
125-
err := errors.New("no valid Kubernetes objects found to watch")
126-
return err
154+
return nil
127155
}
128156

129157
kr.setting.Logger.Info("Object Receiver started")
@@ -175,49 +203,6 @@ func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfi
175203
}
176204
}
177205

178-
// Validate objects against K8s API
179-
func (kr *k8sobjectsreceiver) Validate() []*K8sObjectsConfig {
180-
validObjects, err := kr.config.getValidObjects()
181-
if err != nil {
182-
return err
183-
}
184-
185-
var validConfigs []*K8sObjectsConfig
186-
for _, object := range kr.objects {
187-
gvrs, ok := validObjects[object.Name]
188-
if !ok {
189-
availableResource := make([]string, 0, len(validObjects))
190-
for k := range validObjects {
191-
availableResource = append(availableResource, k)
192-
}
193-
err = fmt.Errorf("resource not found: %s. Available resources in cluster: %v", object.Name, availableResource)
194-
if handlerErr := kr.handleError(err, ""); handlerErr != nil {
195-
return handlerErr
196-
}
197-
198-
continue
199-
}
200-
201-
gvr := gvrs[0]
202-
for i := range gvrs {
203-
if gvrs[i].Group == object.Group {
204-
gvr = gvrs[i]
205-
break
206-
}
207-
}
208-
209-
object.gvr = gvr
210-
validConfigs = append(validConfigs, object)
211-
}
212-
213-
if len(validConfigs) == 0 {
214-
err = errors.New("no valid Kubernetes objects found to watch")
215-
return err
216-
}
217-
218-
return validConfigs
219-
}
220-
221206
func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) {
222207
stopperChan := make(chan struct{})
223208
kr.mu.Lock()

receiver/k8sobjectsreceiver/receiver_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/component/componenttest"
1415
"go.opentelemetry.io/collector/consumer/consumertest"
1516
"go.opentelemetry.io/collector/receiver/receivertest"
1617
apiWatch "k8s.io/apimachinery/pkg/watch"
1718

19+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest"
1820
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sobjectsreceiver/internal/metadata"
1921
)
2022

@@ -294,3 +296,62 @@ func TestExcludeDeletedTrue(t *testing.T) {
294296

295297
assert.NoError(t, r.Shutdown(ctx))
296298
}
299+
300+
func TestReceiverWithLeaderElection(t *testing.T) {
301+
fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{}
302+
fakeHost := &k8sleaderelectortest.FakeHost{
303+
FakeLeaderElection: fakeLeaderElection,
304+
}
305+
leaderElectorID := component.MustNewID("k8s_leader_elector")
306+
307+
mockClient := newMockDynamicClient()
308+
rCfg := createDefaultConfig().(*Config)
309+
rCfg.makeDynamicClient = mockClient.getMockDynamicClient
310+
rCfg.makeDiscoveryClient = getMockDiscoveryClient
311+
rCfg.ErrorMode = PropagateError
312+
rCfg.Objects = []*K8sObjectsConfig{
313+
{
314+
Name: "pods",
315+
Mode: PullMode,
316+
},
317+
}
318+
rCfg.K8sLeaderElector = &leaderElectorID
319+
320+
r, err := newReceiver(
321+
receivertest.NewNopSettings(metadata.Type),
322+
rCfg,
323+
consumertest.NewNop(),
324+
)
325+
require.NoError(t, err)
326+
kr := r.(*k8sobjectsreceiver)
327+
sink := new(consumertest.LogsSink)
328+
kr.consumer = sink
329+
330+
// Setup k8s resources.
331+
numPods := 2
332+
mockClient.createPods(
333+
generatePod("pod1", "default", map[string]any{
334+
"environment": "production",
335+
}, "1"),
336+
generatePod("pod2", "default", map[string]any{
337+
"environment": "production",
338+
}, "1"),
339+
)
340+
341+
err = kr.Start(context.Background(), fakeHost)
342+
require.NoError(t, err)
343+
344+
// elected leader
345+
fakeLeaderElection.InvokeOnLeading()
346+
347+
expectedNumMetrics := numPods
348+
var initialLogRecordCount int
349+
require.Eventually(t, func() bool {
350+
initialLogRecordCount = sink.LogRecordCount()
351+
return initialLogRecordCount == expectedNumMetrics
352+
}, 20*time.Second, 100*time.Millisecond,
353+
"logs not collected")
354+
355+
// lost election
356+
fakeLeaderElection.InvokeOnStopping()
357+
}

0 commit comments

Comments
 (0)