Skip to content

Commit 17f4a02

Browse files
committed
Implement sync disabled queue
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 27714cb commit 17f4a02

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"sync/atomic"
10+
11+
"go.opentelemetry.io/collector/component"
12+
)
13+
14+
var chanPool = sync.Pool{
15+
New: func() interface{} {
16+
return make(chan struct{}, 1)
17+
},
18+
}
19+
20+
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
21+
return &disabledQueue[T]{
22+
consumeFunc: consumeFunc,
23+
size: &atomic.Int64{},
24+
}
25+
}
26+
27+
type disabledQueue[T any] struct {
28+
component.StartFunc
29+
component.ShutdownFunc
30+
consumeFunc ConsumeFunc[T]
31+
size *atomic.Int64
32+
}
33+
34+
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
35+
ch := chanPool.Get().(chan struct{})
36+
defer chanPool.Put(ch)
37+
done := func(err error) {
38+
ch <- struct{}{}
39+
}
40+
d.size.Add(1)
41+
d.consumeFunc(ctx, req, done)
42+
<-ch
43+
d.size.Add(-1)
44+
return nil
45+
}
46+
47+
// Size returns the current number of blocked requests waiting to be processed.
48+
func (d *disabledQueue[T]) Size() int64 {
49+
return d.size.Load()
50+
}
51+
52+
// Capacity returns the capacity of this queue, which is 0 that means no bounds.
53+
func (d *disabledQueue[T]) Capacity() int64 {
54+
return 0
55+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue
5+
6+
import (
7+
"context"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"go.opentelemetry.io/collector/component/componenttest"
17+
)
18+
19+
const flushNum = 5
20+
21+
type buffer struct {
22+
ch chan DoneCallback
23+
nr *atomic.Int64
24+
wg sync.WaitGroup
25+
dones []DoneCallback
26+
}
27+
28+
func newBuffer() *buffer {
29+
buf := &buffer{
30+
ch: make(chan DoneCallback, 10),
31+
nr: &atomic.Int64{},
32+
dones: make([]DoneCallback, 0, flushNum),
33+
}
34+
return buf
35+
}
36+
37+
func (buf *buffer) consume(_ context.Context, _ int64, done DoneCallback) {
38+
buf.ch <- done
39+
}
40+
41+
func (buf *buffer) start() {
42+
buf.wg.Add(1)
43+
go func() {
44+
defer buf.wg.Done()
45+
buf.dones = make([]DoneCallback, 0, flushNum)
46+
for {
47+
select {
48+
case done, ok := <-buf.ch:
49+
if !ok {
50+
return
51+
}
52+
buf.dones = append(buf.dones, done)
53+
if len(buf.dones) == flushNum {
54+
buf.flush()
55+
}
56+
case <-time.After(10 * time.Millisecond):
57+
buf.flush()
58+
}
59+
}
60+
}()
61+
}
62+
63+
func (buf *buffer) shutdown() {
64+
close(buf.ch)
65+
buf.wg.Wait()
66+
}
67+
68+
func (buf *buffer) flush() {
69+
if len(buf.dones) == 0 {
70+
return
71+
}
72+
buf.nr.Add(int64(len(buf.dones)))
73+
for _, done := range buf.dones {
74+
done(nil)
75+
}
76+
buf.dones = buf.dones[:0]
77+
}
78+
79+
func (buf *buffer) consumed() int64 {
80+
return buf.nr.Load()
81+
}
82+
83+
func TestDisabledQueueMultiThread(t *testing.T) {
84+
buf := newBuffer()
85+
buf.start()
86+
q := newDisabledQueue[int64](buf.consume)
87+
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
88+
wg := sync.WaitGroup{}
89+
for i := 0; i < 10; i++ {
90+
wg.Add(1)
91+
go func() {
92+
defer wg.Done()
93+
for j := 0; j < 10_000; j++ {
94+
require.NoError(t, q.Offer(context.Background(), int64(j)))
95+
}
96+
}()
97+
}
98+
wg.Wait()
99+
require.NoError(t, q.Shutdown(context.Background()))
100+
buf.shutdown()
101+
assert.Equal(t, int64(10*10_000), buf.consumed())
102+
}
103+
104+
func BenchmarkDisabledQueueOffer(b *testing.B) {
105+
consumed := &atomic.Int64{}
106+
q := newDisabledQueue[int64](func(_ context.Context, _ int64, done DoneCallback) {
107+
consumed.Add(1)
108+
done(nil)
109+
})
110+
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
111+
b.ResetTimer()
112+
b.ReportAllocs()
113+
for i := 0; i < b.N; i++ {
114+
require.NoError(b, q.Offer(context.Background(), int64(i)))
115+
}
116+
require.NoError(b, q.Shutdown(context.Background()))
117+
assert.Equal(b, int64(b.N), consumed.Load())
118+
}

0 commit comments

Comments
 (0)