Skip to content

Commit c725cdf

Browse files
committed
Add both bytes and items sizes to the persistent metadata
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 6408887 commit c725cdf

26 files changed

+567
-643
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove sizer map in favor of items/bytes sizers. Request based is automatically supported.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13262]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

.chloggen/persistent-metadata.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate NewRequestsSizer always supported.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13262]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9494
ID: set.ID,
9595
Telemetry: set.TelemetrySettings,
9696
Encoding: be.queueBatchSettings.Encoding,
97-
Sizers: be.queueBatchSettings.Sizers,
97+
ItemsSizer: be.queueBatchSettings.ItemsSizer,
98+
BytesSizer: be.queueBatchSettings.BytesSizer,
9899
Partitioner: be.queueBatchSettings.Partitioner,
99100
}
100101
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,9 @@ func noopExport(context.Context, request.Request) error {
153153

154154
func newFakeQueueBatch() QueueBatchSettings[request.Request] {
155155
return QueueBatchSettings[request.Request]{
156-
Encoding: fakeEncoding{},
157-
Sizers: map[request.SizerType]request.Sizer[request.Request]{
158-
request.SizerTypeRequests: request.RequestsSizer[request.Request]{},
159-
},
156+
Encoding: fakeEncoding{},
157+
ItemsSizer: request.NewItemsSizer(),
158+
BytesSizer: requesttest.NewBytesSizer(),
160159
}
161160
}
162161

exporter/exporterhelper/internal/queue/async_queue_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ import (
1414
"github.com/stretchr/testify/require"
1515

1616
"go.opentelemetry.io/collector/component/componenttest"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1718
)
1819

1920
func TestAsyncMemoryQueue(t *testing.T) {
2021
consumed := &atomic.Int64{}
21-
ac := newAsyncQueue(newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100}),
22+
23+
set := newSettings(request.SizerTypeItems, 100)
24+
ac := newAsyncQueue(newMemoryQueue[int64](set),
2225
1, func(_ context.Context, _ int64, done Done) {
2326
consumed.Add(1)
2427
done.OnDone(nil)
@@ -33,8 +36,9 @@ func TestAsyncMemoryQueue(t *testing.T) {
3336

3437
func TestAsyncMemoryQueueBlocking(t *testing.T) {
3538
consumed := &atomic.Int64{}
36-
ac := newAsyncQueue(
37-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, BlockOnOverflow: true}),
39+
set := newSettings(request.SizerTypeItems, 100)
40+
set.BlockOnOverflow = true
41+
ac := newAsyncQueue(newMemoryQueue[int64](set),
3842
4, func(_ context.Context, _ int64, done Done) {
3943
consumed.Add(1)
4044
done.OnDone(nil)
@@ -57,8 +61,10 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
5761

5862
func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
5963
consumed := &atomic.Int64{}
60-
ac := newAsyncQueue(
61-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true, BlockOnOverflow: true}),
64+
set := newSettings(request.SizerTypeItems, 100)
65+
set.BlockOnOverflow = true
66+
set.WaitForResult = true
67+
ac := newAsyncQueue(newMemoryQueue[int64](set),
6268
4, func(_ context.Context, _ int64, done Done) {
6369
consumed.Add(1)
6470
done.OnDone(nil)
@@ -81,8 +87,9 @@ func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
8187

8288
func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
8389
stop := make(chan struct{})
84-
ac := newAsyncQueue(
85-
newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 10, BlockOnOverflow: true}),
90+
set := newSettings(request.SizerTypeItems, 10)
91+
set.BlockOnOverflow = true
92+
ac := newAsyncQueue(newMemoryQueue[int64](set),
8693
1, func(_ context.Context, _ int64, done Done) {
8794
<-stop
8895
done.OnDone(nil)
@@ -110,18 +117,17 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
110117
}
111118

112119
func BenchmarkAsyncMemoryQueue(b *testing.B) {
113-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: int64(10 * b.N)})
114120
consumed := &atomic.Int64{}
115-
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
116-
ac := newAsyncQueue(q, 1, func(_ context.Context, _ int64, done Done) {
121+
set := newSettings(request.SizerTypeItems, int64(10*b.N))
122+
ac := newAsyncQueue(newMemoryQueue[int64](set), 1, func(_ context.Context, _ int64, done Done) {
117123
consumed.Add(1)
118124
done.OnDone(nil)
119125
})
120126
require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost()))
121127
b.ResetTimer()
122128
b.ReportAllocs()
123129
for j := 0; j < b.N; j++ {
124-
require.NoError(b, q.Offer(context.Background(), 10))
130+
require.NoError(b, ac.Offer(context.Background(), 10))
125131
}
126132
require.NoError(b, ac.Shutdown(context.Background()))
127133
assert.EqualValues(b, b.N, consumed.Load())

exporter/exporterhelper/internal/queue/memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type memoryQueue[T any] struct {
4545
// capacity is the capacity of the queue.
4646
func newMemoryQueue[T any](set Settings[T]) readableQueue[T] {
4747
sq := &memoryQueue[T]{
48-
sizer: set.Sizer,
48+
sizer: set.activeSizer(),
4949
cap: set.Capacity,
5050
items: &linkedQueue[T]{},
5151
waitForResult: set.WaitForResult,

exporter/exporterhelper/internal/queue/memory_queue_test.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
)
1920

2021
type sizerInt64 struct{}
@@ -24,7 +25,9 @@ func (s sizerInt64) Sizeof(el int64) int64 {
2425
}
2526

2627
func TestMemoryQueue(t *testing.T) {
27-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
28+
set := newSettings(request.SizerTypeItems, 7)
29+
q := newMemoryQueue[int64](set)
30+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
2831
require.NoError(t, q.Offer(context.Background(), 1))
2932
assert.EqualValues(t, 1, q.Size())
3033
assert.EqualValues(t, 7, q.Capacity())
@@ -50,10 +53,14 @@ func TestMemoryQueue(t *testing.T) {
5053

5154
require.NoError(t, q.Shutdown(context.Background()))
5255
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
56+
require.NoError(t, q.Shutdown(context.Background()))
5357
}
5458

5559
func TestMemoryQueueBlockingCancelled(t *testing.T) {
56-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 5, BlockOnOverflow: true})
60+
set := newSettings(request.SizerTypeItems, 5)
61+
set.BlockOnOverflow = true
62+
q := newMemoryQueue[int64](set)
63+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
5764
require.NoError(t, q.Offer(context.Background(), 3))
5865
ctx, cancel := context.WithCancel(context.Background())
5966
wg := sync.WaitGroup{}
@@ -73,7 +80,9 @@ func TestMemoryQueueBlockingCancelled(t *testing.T) {
7380
}
7481

7582
func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
76-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7})
83+
set := newSettings(request.SizerTypeItems, 7)
84+
q := newMemoryQueue[int64](set)
85+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
7786
require.NoError(t, q.Offer(context.Background(), 1))
7887
require.NoError(t, q.Offer(context.Background(), 3))
7988

@@ -90,28 +99,40 @@ func TestMemoryQueueDrainWhenShutdown(t *testing.T) {
9099
}))
91100
assert.EqualValues(t, 0, q.Size())
92101
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
102+
require.NoError(t, q.Shutdown(context.Background()))
93103
}
94104

95105
func TestMemoryQueueOfferInvalidSize(t *testing.T) {
96-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
106+
set := newSettings(request.SizerTypeItems, 1)
107+
q := newMemoryQueue[int64](set)
108+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
97109
require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize)
110+
require.NoError(t, q.Shutdown(context.Background()))
98111
}
99112

100113
func TestMemoryQueueRejectOverCapacityElements(t *testing.T) {
101-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 7, BlockOnOverflow: true})
114+
set := newSettings(request.SizerTypeItems, 1)
115+
set.BlockOnOverflow = true
116+
q := newMemoryQueue[int64](set)
117+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
102118
require.ErrorIs(t, q.Offer(context.Background(), 8), errSizeTooLarge)
119+
require.NoError(t, q.Shutdown(context.Background()))
103120
}
104121

105122
func TestMemoryQueueOfferZeroSize(t *testing.T) {
106-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
123+
set := newSettings(request.SizerTypeItems, 1)
124+
q := newMemoryQueue[int64](set)
125+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
107126
require.NoError(t, q.Offer(context.Background(), 0))
108127
require.NoError(t, q.Shutdown(context.Background()))
109128
// Because the size 0 is ignored, nothing to drain.
110129
assert.False(t, consume(q, func(context.Context, int64) error { t.FailNow(); return nil }))
111130
}
112131

113-
func TestMemoryQueueZeroCapacity(t *testing.T) {
114-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1})
132+
func TestMemoryQueueOverflow(t *testing.T) {
133+
set := newSettings(request.SizerTypeItems, 1)
134+
q := newMemoryQueue[int64](set)
135+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
115136
require.NoError(t, q.Offer(context.Background(), 1))
116137
require.ErrorIs(t, q.Offer(context.Background(), 1), ErrQueueIsFull)
117138
require.NoError(t, q.Shutdown(context.Background()))
@@ -120,7 +141,9 @@ func TestMemoryQueueZeroCapacity(t *testing.T) {
120141
func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
121142
wg := sync.WaitGroup{}
122143
myErr := errors.New("test error")
123-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
144+
set := newSettings(request.SizerTypeItems, 100)
145+
set.WaitForResult = true
146+
q := newMemoryQueue[int64](set)
124147
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
125148
wg.Add(1)
126149
go func() {
@@ -138,7 +161,9 @@ func TestMemoryQueueWaitForResultPassErrorBack(t *testing.T) {
138161
func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
139162
wg := sync.WaitGroup{}
140163
stop := make(chan struct{})
141-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
164+
set := newSettings(request.SizerTypeItems, 100)
165+
set.WaitForResult = true
166+
q := newMemoryQueue[int64](set)
142167
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
143168

144169
// Consume async new data.
@@ -167,7 +192,9 @@ func TestMemoryQueueWaitForResultCancelIncomingRequest(t *testing.T) {
167192
func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
168193
wg := sync.WaitGroup{}
169194
stop := make(chan struct{})
170-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 100, WaitForResult: true})
195+
set := newSettings(request.SizerTypeItems, 100)
196+
set.WaitForResult = true
197+
q := newMemoryQueue[int64](set)
171198
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
172199

173200
// Consume async new data.
@@ -197,7 +224,9 @@ func TestMemoryQueueWaitForResultSizeAndCapacity(t *testing.T) {
197224
func BenchmarkMemoryQueueWaitForResult(b *testing.B) {
198225
wg := sync.WaitGroup{}
199226
consumed := &atomic.Int64{}
200-
q := newMemoryQueue[int64](Settings[int64]{Sizer: sizerInt64{}, Capacity: 1000, WaitForResult: true})
227+
set := newSettings(request.SizerTypeItems, 100)
228+
set.WaitForResult = true
229+
q := newMemoryQueue[int64](set)
201230
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
202231

203232
// Consume async new data.

0 commit comments

Comments
 (0)