Skip to content

Commit acb60bc

Browse files
authored
Add both bytes and items sizes to the persistent metadata (#13262)
No changelog since this is not released yet. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 627e36f commit acb60bc

26 files changed

+568
-650
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove sizer map in favor of items/bytes sizers. Request based is automatically supported.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13262]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

.chloggen/persistent-metadata.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate NewRequestsSizer always supported.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13262]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9494
ID: set.ID,
9595
Telemetry: set.TelemetrySettings,
9696
Encoding: be.queueBatchSettings.Encoding,
97-
Sizers: be.queueBatchSettings.Sizers,
97+
ItemsSizer: be.queueBatchSettings.ItemsSizer,
98+
BytesSizer: be.queueBatchSettings.BytesSizer,
9899
Partitioner: be.queueBatchSettings.Partitioner,
99100
}
100101
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,9 @@ func noopExport(context.Context, request.Request) error {
153153

154154
func newFakeQueueBatch() QueueBatchSettings[request.Request] {
155155
return QueueBatchSettings[request.Request]{
156-
Encoding: fakeEncoding{},
157-
Sizers: map[request.SizerType]request.Sizer[request.Request]{
158-
request.SizerTypeRequests: request.RequestsSizer[request.Request]{},
159-
},
156+
Encoding: fakeEncoding{},
157+
ItemsSizer: request.NewItemsSizer(),
158+
BytesSizer: requesttest.NewBytesSizer(),
160159
}
161160
}
162161

exporter/exporterhelper/internal/queue/async_queue_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ import (
1414
"github.com/stretchr/testify/require"
1515

1616
"go.opentelemetry.io/collector/component/componenttest"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1718
)
1819

1920
func TestAsyncMemoryQueue(t *testing.T) {
2021
consumed := &atomic.Int64{}
21-
ac := newAsyncQueue(newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100}),
22+
23+
set := newSettings(request.SizerTypeItems, 100)
24+
ac := newAsyncQueue(newMemoryQueue[int64](set),
2225
1, func(_ context.Context, _ int64, done Done) {
2326
consumed.Add(1)
2427
done.OnDone(nil)
@@ -33,8 +36,9 @@ func TestAsyncMemoryQueue(t *testing.T) {
3336

3437
func TestAsyncMemoryQueueBlocking(t *testing.T) {
3538
consumed := &atomic.Int64{}
36-
ac := newAsyncQueue(
37-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, BlockOnOverflow: true}),
39+
set := newSettings(request.SizerTypeItems, 100)
40+
set.BlockOnOverflow = true
41+
ac := newAsyncQueue(newMemoryQueue[int64](set),
3842
4, func(_ context.Context, _ int64, done Done) {
3943
consumed.Add(1)
4044
done.OnDone(nil)
@@ -57,8 +61,10 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
5761

5862
func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
5963
consumed := &atomic.Int64{}
60-
ac := newAsyncQueue(
61-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true, BlockOnOverflow: true}),
64+
set := newSettings(request.SizerTypeItems, 100)
65+
set.BlockOnOverflow = true
66+
set.WaitForResult = true
67+
ac := newAsyncQueue(newMemoryQueue[int64](set),
6268
4, func(_ context.Context, _ int64, done Done) {
6369
consumed.Add(1)
6470
done.OnDone(nil)
@@ -81,8 +87,9 @@ func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
8187

8288
func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
8389
stop := make(chan struct{})
84-
ac := newAsyncQueue(
85-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 10, BlockOnOverflow: true}),
90+
set := newSettings(request.SizerTypeItems, 10)
91+
set.BlockOnOverflow = true
92+
ac := newAsyncQueue(newMemoryQueue[int64](set),
8693
1, func(_ context.Context, _ int64, done Done) {
8794
<-stop
8895
done.OnDone(nil)
@@ -110,18 +117,17 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
110117
}
111118

112119
func BenchmarkAsyncMemoryQueue(b *testing.B) {
113-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: int64(10 * b.N)})
114120
consumed := &atomic.Int64{}
115-
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
116-
ac := newAsyncQueue(q, 1, func(_ context.Context, _ int64, done Done) {
121+
set := newSettings(request.SizerTypeItems, int64(10*b.N))
122+
ac := newAsyncQueue(newMemoryQueue[int64](set), 1, func(_ context.Context, _ int64, done Done) {
117123
consumed.Add(1)
118124
done.OnDone(nil)
119125
})
120126
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
121127
b.ResetTimer()
122128
b.ReportAllocs()
123129
for j := 0; j < b.N; j++ {
124-
require.NoError(b, q.Offer(context.Background(), 10))
130+
require.NoError(b, ac.Offer(context.Background(), 10))
125131
}
126132
require.NoError(b, ac.Shutdown(context.Background()))
127133
assert.EqualValues(b, b.N, consumed.Load())

exporter/exporterhelper/internal/queue/memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type memoryQueue[T any] struct {
4545
// capacity is the capacity of the queue.
4646
func newMemoryQueue[T any](set Settings[T]) readableQueue[T] {
4747
sq := &memoryQueue[T]{
48-
sizer: set.Sizer,
48+
sizer: set.activeSizer(),
4949
cap: set.Capacity,
5050
items: &linkedQueue[T]{},
5151
waitForResult: set.WaitForResult,

exporter/exporterhelper/internal/queue/memory_queue_test.go

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,13 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
)
1920

20-
type sizerInt64 struct{}
21-
22-
func (s sizerInt64) Sizeof(el int64) int64 {
23-
return el
24-
}
25-
2621
func TestMemoryQueue(t *testing.T) {
27-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
22+
set := newSettings(request.SizerTypeItems, 7)
23+
q := newMemoryQueue[int64](set)
24+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
2825
require.NoError(t, q.Offer(context.Background(), 1))
2926
assert.EqualValues(t, 1, q.Size())
3027
assert.EqualValues(t, 7, q.Capacity())
@@ -50,10 +47,14 @@ func TestMemoryQueue(t *testing.T) {
5047

5148
require.NoError(t, q.Shutdown(context.Background()))
5249
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
50+
require.NoError(t, q.Shutdown(context.Background()))
5351
}
5452

5553
func TestMemoryQueueBlockingCancelled(t *testing.T) {
56-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 5, BlockOnOverflow: true})
54+
set := newSettings(request.SizerTypeItems, 5)
55+
set.BlockOnOverflow = true
56+
q := newMemoryQueue[int64](set)
57+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
5758
require.NoError(t, q.Offer(context.Background(), 3))
5859
ctx, cancel := context.WithCancel(context.Background())
5960
wg := sync.WaitGroup{}
@@ -73,7 +74,9 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
7374
}
7475

7576
func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
76-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
77+
set := newSettings(request.SizerTypeItems, 7)
78+
q := newMemoryQueue[int64](set)
79+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
7780
require.NoError(t, q.Offer(context.Background(), 1))
7881
require.NoError(t, q.Offer(context.Background(), 3))
7982

@@ -90,28 +93,40 @@ func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
9093
}))
9194
assert.EqualValues(t, 0, q.Size())
9295
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
96+
require.NoError(t, q.Shutdown(context.Background()))
9397
}
9498

9599
func TestMemoryQueueOfferInvalidSize(t *testing.T) {
96-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
100+
set := newSettings(request.SizerTypeItems, 1)
101+
q := newMemoryQueue[int64](set)
102+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
97103
require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize)
104+
require.NoError(t, q.Shutdown(context.Background()))
98105
}
99106

100107
func TestMemoryQueueRejectOverCapacityElements(t *testing.T) {
101-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7, BlockOnOverflow: true})
108+
set := newSettings(request.SizerTypeItems, 1)
109+
set.BlockOnOverflow = true
110+
q := newMemoryQueue[int64](set)
111+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
102112
require.ErrorIs(t, q.Offer(context.Background(), 8), errSizeTooLarge)
113+
require.NoError(t, q.Shutdown(context.Background()))
103114
}
104115

105116
func TestMemoryQueueOfferZeroSize(t *testing.T) {
106-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
117+
set := newSettings(request.SizerTypeItems, 1)
118+
q := newMemoryQueue[int64](set)
119+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
107120
require.NoError(t, q.Offer(context.Background(), 0))
108121
require.NoError(t, q.Shutdown(context.Background()))
109122
// Because the size 0 is ignored, nothing to drain.
110123
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
111124
}
112125

113-
func TestMemoryQueueZeroCapacity(t *testing.T) {
114-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
126+
func TestMemoryQueueOverflow(t *testing.T) {
127+
set := newSettings(request.SizerTypeItems, 1)
128+
q := newMemoryQueue[int64](set)
129+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
115130
require.NoError(t, q.Offer(context.Background(), 1))
116131
require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull)
117132
require.NoError(t, q.Shutdown(context.Background()))
@@ -120,7 +135,9 @@ func TestMemoryQueueZeroCapacity(t *testing.T) {
120135
func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
121136
wg := sync.WaitGroup{}
122137
myErr := errors.New("test error")
123-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
138+
set := newSettings(request.SizerTypeItems, 100)
139+
set.WaitForResult = true
140+
q := newMemoryQueue[int64](set)
124141
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
125142
wg.Add(1)
126143
go func() {
@@ -138,7 +155,9 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
138155
func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
139156
wg := sync.WaitGroup{}
140157
stop := make(chan struct{})
141-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
158+
set := newSettings(request.SizerTypeItems, 100)
159+
set.WaitForResult = true
160+
q := newMemoryQueue[int64](set)
142161
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
143162

144163
// Consume async new data.
@@ -167,7 +186,9 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
167186
func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
168187
wg := sync.WaitGroup{}
169188
stop := make(chan struct{})
170-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
189+
set := newSettings(request.SizerTypeItems, 100)
190+
set.WaitForResult = true
191+
q := newMemoryQueue[int64](set)
171192
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
172193

173194
// Consume async new data.
@@ -197,7 +218,9 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
197218
func BenchmarkMemoryQueueWaitForResult(b *testing.B) {
198219
wg := sync.WaitGroup{}
199220
consumed := &atomic.Int64{}
200-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1000, WaitForResult: true})
221+
set := newSettings(request.SizerTypeItems, 100)
222+
set.WaitForResult = true
223+
q := newMemoryQueue[int64](set)
201224
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
202225

203226
// Consume async new data.

0 commit comments

Comments
 (0)