Skip to content

Commit 48dab61

Browse files
authored
[chore] Remove unnecessary internal duplicate settings (#13310)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 3b627ad commit 48dab61

File tree

6 files changed

+73
-105
lines changed

6 files changed

+73
-105
lines changed

exporter/exporterhelper/internal/queue/async_queue_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
func TestAsyncMemoryQueue(t *testing.T) {
2020
consumed := &atomic.Int64{}
21-
ac := newAsyncQueue(newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100}),
21+
ac := newAsyncQueue(newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100}),
2222
1, func(_ context.Context, _ int64, done Done) {
2323
consumed.Add(1)
2424
done.OnDone(nil)
@@ -34,7 +34,7 @@ func TestAsyncMemoryQueue(t *testing.T) {
3434
func TestAsyncMemoryQueueBlocking(t *testing.T) {
3535
consumed := &atomic.Int64{}
3636
ac := newAsyncQueue(
37-
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blockOnOverflow: true}),
37+
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, BlockOnOverflow: true}),
3838
4, func(_ context.Context, _ int64, done Done) {
3939
consumed.Add(1)
4040
done.OnDone(nil)
@@ -58,7 +58,7 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
5858
func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
5959
consumed := &atomic.Int64{}
6060
ac := newAsyncQueue(
61-
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, waitForResult: true, blockOnOverflow: true}),
61+
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true, BlockOnOverflow: true}),
6262
4, func(_ context.Context, _ int64, done Done) {
6363
consumed.Add(1)
6464
done.OnDone(nil)
@@ -82,7 +82,7 @@ func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
8282
func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
8383
stop := make(chan struct{})
8484
ac := newAsyncQueue(
85-
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 10, blockOnOverflow: true}),
85+
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 10, BlockOnOverflow: true}),
8686
1, func(_ context.Context, _ int64, done Done) {
8787
<-stop
8888
done.OnDone(nil)
@@ -110,7 +110,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
110110
}
111111

112112
func BenchmarkAsyncMemoryQueue(b *testing.B) {
113-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: int64(10 * b.N)})
113+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: int64(10 * b.N)})
114114
consumed := &atomic.Int64{}
115115
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
116116
ac := newAsyncQueue(q, 1, func(_ context.Context, _ int64, done Done) {

exporter/exporterhelper/internal/queue/memory_queue.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,6 @@ var (
2525
errSizeTooLarge = errors.New("element size too large")
2626
)
2727

28-
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
29-
type memoryQueueSettings[T any] struct {
30-
sizer request.Sizer[T]
31-
capacity int64
32-
waitForResult bool
33-
blockOnOverflow bool
34-
}
35-
3628
// memoryQueue is an in-memory implementation of a Queue.
3729
type memoryQueue[T any] struct {
3830
component.StartFunc
@@ -51,13 +43,13 @@ type memoryQueue[T any] struct {
5143

5244
// newMemoryQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
5345
// capacity is the capacity of the queue.
54-
func newMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] {
46+
func newMemoryQueue[T any](set Settings[T]) readableQueue[T] {
5547
sq := &memoryQueue[T]{
56-
sizer: set.sizer,
57-
cap: set.capacity,
48+
sizer: set.Sizer,
49+
cap: set.Capacity,
5850
items: &linkedQueue[T]{},
59-
waitForResult: set.waitForResult,
60-
blockOnOverflow: set.blockOnOverflow,
51+
waitForResult: set.WaitForResult,
52+
blockOnOverflow: set.BlockOnOverflow,
6153
}
6254
sq.hasMoreElements = sync.NewCond(&sq.mu)
6355
sq.hasMoreSpace = newCond(&sq.mu)

exporter/exporterhelper/internal/queue/memory_queue_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (s sizerInt64) Sizeof(el int64) int64 {
2424
}
2525

2626
func TestMemoryQueue(t *testing.T) {
27-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7})
27+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
2828
require.NoError(t, q.Offer(context.Background(), 1))
2929
assert.EqualValues(t, 1, q.Size())
3030
assert.EqualValues(t, 7, q.Capacity())
@@ -53,7 +53,7 @@ func TestMemoryQueue(t *testing.T) {
5353
}
5454

5555
func TestMemoryQueueBlockingCancelled(t *testing.T) {
56-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 5, blockOnOverflow: true})
56+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 5, BlockOnOverflow: true})
5757
require.NoError(t, q.Offer(context.Background(), 3))
5858
ctx, cancel := context.WithCancel(context.Background())
5959
wg := sync.WaitGroup{}
@@ -73,7 +73,7 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
7373
}
7474

7575
func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
76-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7})
76+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
7777
require.NoError(t, q.Offer(context.Background(), 1))
7878
require.NoError(t, q.Offer(context.Background(), 3))
7979

@@ -93,25 +93,25 @@ func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
9393
}
9494

9595
func TestMemoryQueueOfferInvalidSize(t *testing.T) {
96-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1})
96+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
9797
require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize)
9898
}
9999

100100
func TestMemoryQueueRejectOverCapacityElements(t *testing.T) {
101-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 7, blockOnOverflow: true})
101+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7, BlockOnOverflow: true})
102102
require.ErrorIs(t, q.Offer(context.Background(), 8), errSizeTooLarge)
103103
}
104104

105105
func TestMemoryQueueOfferZeroSize(t *testing.T) {
106-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1})
106+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
107107
require.NoError(t, q.Offer(context.Background(), 0))
108108
require.NoError(t, q.Shutdown(context.Background()))
109109
// Because the size 0 is ignored, nothing to drain.
110110
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
111111
}
112112

113113
func TestMemoryQueueZeroCapacity(t *testing.T) {
114-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1})
114+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
115115
require.NoError(t, q.Offer(context.Background(), 1))
116116
require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull)
117117
require.NoError(t, q.Shutdown(context.Background()))
@@ -120,7 +120,7 @@ func TestMemoryQueueZeroCapacity(t *testing.T) {
120120
func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
121121
wg := sync.WaitGroup{}
122122
myErr := errors.New("test error")
123-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, waitForResult: true})
123+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
124124
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
125125
wg.Add(1)
126126
go func() {
@@ -138,7 +138,7 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
138138
func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
139139
wg := sync.WaitGroup{}
140140
stop := make(chan struct{})
141-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, waitForResult: true})
141+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
142142
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
143143

144144
// Consume async new data.
@@ -167,7 +167,7 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
167167
func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
168168
wg := sync.WaitGroup{}
169169
stop := make(chan struct{})
170-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, waitForResult: true})
170+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
171171
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
172172

173173
// Consume async new data.
@@ -197,7 +197,7 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
197197
func BenchmarkMemoryQueueWaitForResult(b *testing.B) {
198198
wg := sync.WaitGroup{}
199199
consumed := &atomic.Int64{}
200-
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1000, waitForResult: true})
200+
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1000, WaitForResult: true})
201201
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
202202

203203
// Consume async new data.

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
18-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1918
"go.opentelemetry.io/collector/extension/xextension/storage"
2019
"go.opentelemetry.io/collector/pipeline"
2120
)
@@ -48,18 +47,6 @@ var indexDonePool = sync.Pool{
4847
},
4948
}
5049

51-
type persistentQueueSettings[T any] struct {
52-
sizer request.Sizer[T]
53-
sizerType request.SizerType
54-
capacity int64
55-
blockOnOverflow bool
56-
signal pipeline.Signal
57-
storageID component.ID
58-
encoding Encoding[T]
59-
id component.ID
60-
telemetry component.TelemetrySettings
61-
}
62-
6350
// persistentQueue provides a persistent queue implementation backed by file storage extension
6451
//
6552
// Write index describes the position at which next item is going to be stored.
@@ -83,7 +70,7 @@ type persistentQueueSettings[T any] struct {
8370
// index index x
8471
// xxxx deleted
8572
type persistentQueue[T any] struct {
86-
set persistentQueueSettings[T]
73+
set Settings[T]
8774
logger *zap.Logger
8875
client storage.Client
8976

@@ -97,10 +84,10 @@ type persistentQueue[T any] struct {
9784
}
9885

9986
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
100-
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
87+
func newPersistentQueue[T any](set Settings[T]) readableQueue[T] {
10188
pq := &persistentQueue[T]{
10289
set: set,
103-
logger: set.telemetry.Logger,
90+
logger: set.Telemetry.Logger,
10491
}
10592
pq.hasMoreElements = sync.NewCond(&pq.mu)
10693
pq.hasMoreSpace = newCond(&pq.mu)
@@ -109,7 +96,7 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T]
10996

11097
// Start starts the persistentQueue with the given number of consumers.
11198
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error {
112-
storageClient, err := toStorageClient(ctx, pq.set.storageID, host, pq.set.id, pq.set.signal)
99+
storageClient, err := toStorageClient(ctx, *pq.set.StorageID, host, pq.set.ID, pq.set.Signal)
113100
if err != nil {
114101
return err
115102
}
@@ -124,7 +111,7 @@ func (pq *persistentQueue[T]) Size() int64 {
124111
}
125112

126113
func (pq *persistentQueue[T]) Capacity() int64 {
127-
return pq.set.capacity
114+
return pq.set.Capacity
128115
}
129116

130117
func (pq *persistentQueue[T]) initClient(ctx context.Context, client storage.Client) {
@@ -198,17 +185,17 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
198185

199186
// putInternal is the internal version that requires caller to hold the mutex lock.
200187
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
201-
reqSize := pq.set.sizer.Sizeof(req)
202-
for pq.metadata.QueueSize+reqSize > pq.set.capacity {
203-
if !pq.set.blockOnOverflow {
188+
reqSize := pq.set.Sizer.Sizeof(req)
189+
for pq.metadata.QueueSize+reqSize > pq.set.Capacity {
190+
if !pq.set.BlockOnOverflow {
204191
return ErrQueueIsFull
205192
}
206193
if err := pq.hasMoreSpace.Wait(ctx); err != nil {
207194
return err
208195
}
209196
}
210197

211-
reqBuf, err := pq.set.encoding.Marshal(ctx, req)
198+
reqBuf, err := pq.set.Encoding.Marshal(ctx, req)
212199
if err != nil {
213200
return err
214201
}
@@ -248,7 +235,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
248235
}
249236
if consumed {
250237
id := indexDonePool.Get().(*indexDone)
251-
id.reset(index, pq.set.sizer.Sizeof(req), pq)
238+
id.reset(index, pq.set.Sizer.Sizeof(req), pq)
252239
return reqCtx, req, id, true
253240
}
254241
// More space available, data was dropped.
@@ -278,7 +265,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, conte
278265
var request T
279266
restoredCtx := context.Background()
280267
if err == nil {
281-
restoredCtx, request, err = pq.set.encoding.Unmarshal(getOp.Value)
268+
restoredCtx, request, err = pq.set.Encoding.Unmarshal(getOp.Value)
282269
}
283270

284271
if err != nil {
@@ -381,7 +368,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
381368
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
382369
continue
383370
}
384-
reqCtx, req, err := pq.set.encoding.Unmarshal(op.Value)
371+
reqCtx, req, err := pq.set.Encoding.Unmarshal(op.Value)
385372
// If error happened or item is nil, it will be efficiently ignored
386373
if err != nil {
387374
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))

exporter/exporterhelper/internal/queue/persistent_queue_test.go

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,15 @@ func (m *fakeStorageClientWithErrors) Reset() {
216216
func createAndStartTestPersistentQueue(t *testing.T, sizer request.Sizer[uint64], capacity int64, numConsumers int,
217217
consumeFunc func(_ context.Context, item uint64) error,
218218
) Queue[uint64] {
219-
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
220-
sizer: sizer,
221-
capacity: capacity,
222-
signal: pipeline.SignalTraces,
223-
storageID: component.ID{},
224-
encoding: uint64Encoding{},
225-
id: component.NewID(exportertest.NopType),
226-
telemetry: componenttest.NewNopTelemetrySettings(),
219+
storageID := component.ID{}
220+
pq := newPersistentQueue[uint64](Settings[uint64]{
221+
Sizer: sizer,
222+
Capacity: capacity,
223+
Signal: pipeline.SignalTraces,
224+
StorageID: &storageID,
225+
Encoding: uint64Encoding{},
226+
ID: component.NewID(exportertest.NopType),
227+
Telemetry: componenttest.NewNopTelemetrySettings(),
227228
})
228229
ac := newAsyncQueue(pq, numConsumers, func(ctx context.Context, item uint64, done Done) {
229230
done.OnDone(consumeFunc(ctx, item))
@@ -239,14 +240,15 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer request.Sizer[uint64]
239240
}
240241

241242
func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[uint64] {
242-
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
243-
sizer: request.RequestsSizer[uint64]{},
244-
capacity: 1000,
245-
signal: pipeline.SignalTraces,
246-
storageID: component.ID{},
247-
encoding: uint64Encoding{},
248-
id: component.NewID(exportertest.NopType),
249-
telemetry: componenttest.NewNopTelemetrySettings(),
243+
storageID := component.ID{}
244+
pq := newPersistentQueue[uint64](Settings[uint64]{
245+
Sizer: request.RequestsSizer[uint64]{},
246+
Capacity: 1000,
247+
Signal: pipeline.SignalTraces,
248+
StorageID: &storageID,
249+
Encoding: uint64Encoding{},
250+
ID: component.NewID(exportertest.NopType),
251+
Telemetry: componenttest.NewNopTelemetrySettings(),
250252
}).(*persistentQueue[uint64])
251253
pq.initClient(context.Background(), client)
252254
return pq
@@ -261,14 +263,15 @@ func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Exten
261263
}
262264

263265
func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer request.Sizer[uint64], capacity int64) *persistentQueue[uint64] {
264-
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
265-
sizer: sizer,
266-
capacity: capacity,
267-
signal: pipeline.SignalTraces,
268-
storageID: component.ID{},
269-
encoding: uint64Encoding{},
270-
id: component.NewID(exportertest.NopType),
271-
telemetry: componenttest.NewNopTelemetrySettings(),
266+
storageID := component.ID{}
267+
pq := newPersistentQueue[uint64](Settings[uint64]{
268+
Sizer: sizer,
269+
Capacity: capacity,
270+
Signal: pipeline.SignalTraces,
271+
StorageID: &storageID,
272+
Encoding: uint64Encoding{},
273+
ID: component.NewID(exportertest.NopType),
274+
Telemetry: componenttest.NewNopTelemetrySettings(),
272275
}).(*persistentQueue[uint64])
273276
require.NoError(tb, pq.Start(context.Background(), hosttest.NewHost(map[component.ID]component.Component{{}: ext})))
274277
return pq
@@ -408,15 +411,16 @@ func TestPersistentBlockingQueue(t *testing.T) {
408411

409412
for _, tt := range tests {
410413
t.Run(tt.name, func(t *testing.T) {
411-
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
412-
sizer: tt.sizer,
413-
capacity: 100,
414-
blockOnOverflow: true,
415-
signal: pipeline.SignalTraces,
416-
storageID: component.ID{},
417-
encoding: uint64Encoding{},
418-
id: component.NewID(exportertest.NopType),
419-
telemetry: componenttest.NewNopTelemetrySettings(),
414+
storageID := component.ID{}
415+
pq := newPersistentQueue[uint64](Settings[uint64]{
416+
Sizer: tt.sizer,
417+
Capacity: 100,
418+
BlockOnOverflow: true,
419+
Signal: pipeline.SignalTraces,
420+
StorageID: &storageID,
421+
Encoding: uint64Encoding{},
422+
ID: component.NewID(exportertest.NopType),
423+
Telemetry: componenttest.NewNopTelemetrySettings(),
420424
})
421425
consumed := &atomic.Int64{}
422426
ac := newAsyncQueue(pq, 10, func(_ context.Context, _ uint64, done Done) {
@@ -536,7 +540,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {
536540
}
537541

538542
func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
539-
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{})
543+
pq := newPersistentQueue[uint64](Settings[uint64]{})
540544
// verify that stopping a un-start/started w/error queue does not panic
541545
assert.NoError(t, pq.Shutdown(context.Background()))
542546
}

0 commit comments

Comments
 (0)