Skip to content

Commit c79b58c

Browse files
committed
bugfix: Fix data race in batchTimeSeries state
Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 2052bd1 commit c79b58c

File tree

4 files changed

+62
-31
lines changed

4 files changed

+62
-31
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewriteexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Fix data race in batch series state."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36524]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: ""
19+
# If your change doesn't affect end users or the exported elements of any package,
20+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [user]

exporter/prometheusremotewriteexporter/exporter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ type prwExporter struct {
8484
wal *prweWAL
8585
exporterSettings prometheusremotewrite.Settings
8686
telemetry prwTelemetry
87-
batchTimeSeriesState batchTimeSeriesState
87+
88+
batchTimeSeriesState *batchTimeSeriesState
8889
}
8990

9091
func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
@@ -191,7 +192,6 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
191192
case <-prwe.closeChan:
192193
return errors.New("shutdown has been called")
193194
default:
194-
195195
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
196196
if err != nil {
197197
prwe.telemetry.recordTranslationFailure(ctx)
@@ -229,7 +229,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
229229
}
230230

231231
// Calls the helper function to convert and batch the TsMap to the desired format
232-
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
232+
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, prwe.batchTimeSeriesState)
233233
if err != nil {
234234
return err
235235
}

exporter/prometheusremotewriteexporter/helper.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,29 @@ import (
77
"errors"
88
"math"
99
"sort"
10+
"sync/atomic"
1011

1112
"github.com/prometheus/prometheus/prompb"
1213
)
1314

1415
type batchTimeSeriesState struct {
1516
// Track batch sizes sent to avoid over allocating huge buffers.
1617
// This helps in the case where large batches are sent to avoid allocating too much unused memory
17-
nextTimeSeriesBufferSize int
18-
nextMetricMetadataBufferSize int
19-
nextRequestBufferSize int
18+
nextTimeSeriesBufferSize atomic.Int64
19+
nextMetricMetadataBufferSize atomic.Int64
20+
nextRequestBufferSize atomic.Int64
2021
}
2122

22-
func newBatchTimeSericesState() batchTimeSeriesState {
23-
return batchTimeSeriesState{
24-
nextTimeSeriesBufferSize: math.MaxInt,
25-
nextMetricMetadataBufferSize: math.MaxInt,
26-
nextRequestBufferSize: 0,
23+
func newBatchTimeSericesState() *batchTimeSeriesState {
24+
state := &batchTimeSeriesState{
25+
nextTimeSeriesBufferSize: atomic.Int64{},
26+
nextMetricMetadataBufferSize: atomic.Int64{},
27+
nextRequestBufferSize: atomic.Int64{},
2728
}
29+
state.nextTimeSeriesBufferSize.Store(math.MaxInt64)
30+
state.nextMetricMetadataBufferSize.Store(math.MaxInt64)
31+
state.nextRequestBufferSize.Store(0)
32+
return state
2833
}
2934

3035
// batchTimeSeries splits series into multiple batch write requests.
@@ -34,22 +39,22 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
3439
}
3540

3641
// Allocate a buffer size of at least 10, or twice the last # of requests we sent
37-
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))
42+
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize.Load()))
3843

3944
// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
40-
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
45+
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize.Load(), int64(len(tsMap))))
4146
sizeOfCurrentBatch := 0
4247

4348
i := 0
4449
for _, v := range tsMap {
4550
sizeOfSeries := v.Size()
4651

4752
if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
48-
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
53+
state.nextTimeSeriesBufferSize.Store(int64(max(10, 2*len(tsArray))))
4954
wrapped := convertTimeseriesToRequest(tsArray)
5055
requests = append(requests, wrapped)
5156

52-
tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
57+
tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize.Load(), int64(len(tsMap)-i)))
5358
sizeOfCurrentBatch = 0
5459
}
5560

@@ -64,18 +69,18 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
6469
}
6570

6671
// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
67-
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
72+
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize.Load(), int64(len(m))))
6873
sizeOfCurrentBatch = 0
6974
i = 0
7075
for _, v := range m {
7176
sizeOfM := v.Size()
7277

7378
if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
74-
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
79+
state.nextMetricMetadataBufferSize.Store(int64(max(10, 2*len(mArray))))
7580
wrapped := convertMetadataToRequest(mArray)
7681
requests = append(requests, wrapped)
7782

78-
mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
83+
mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize.Load(), int64(len(m)-i)))
7984
sizeOfCurrentBatch = 0
8085
}
8186

@@ -89,7 +94,7 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
8994
requests = append(requests, wrapped)
9095
}
9196

92-
state.nextRequestBufferSize = 2 * len(requests)
97+
state.nextRequestBufferSize.Store(int64(2 * len(requests)))
9398
return requests, nil
9499
}
95100

exporter/prometheusremotewriteexporter/helper_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func Test_batchTimeSeries(t *testing.T) {
5959
for _, tt := range tests {
6060
t.Run(tt.name, func(t *testing.T) {
6161
state := newBatchTimeSericesState()
62-
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state)
62+
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, state)
6363
if tt.returnErr {
6464
assert.Error(t, err)
6565
return
@@ -68,13 +68,13 @@ func Test_batchTimeSeries(t *testing.T) {
6868
assert.NoError(t, err)
6969
assert.Len(t, requests, tt.numExpectedRequests)
7070
if tt.numExpectedRequests <= 1 {
71-
assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize)
72-
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
73-
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
71+
assert.Equal(t, int64(math.MaxInt64), state.nextTimeSeriesBufferSize.Load())
72+
assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load())
73+
assert.Equal(t, int64(2*len(requests)), state.nextRequestBufferSize.Load())
7474
} else {
75-
assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize)
76-
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
77-
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
75+
assert.Equal(t, int64(max(10, len(requests[len(requests)-2].Timeseries)*2)), state.nextTimeSeriesBufferSize.Load())
76+
assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load())
77+
assert.Equal(t, int64(2*len(requests)), state.nextRequestBufferSize.Load())
7878
}
7979
})
8080
}
@@ -97,13 +97,13 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) {
9797
tsMap1 := getTimeseriesMap(tsArray)
9898

9999
state := newBatchTimeSericesState()
100-
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
100+
requests, err := batchTimeSeries(tsMap1, 1000000, nil, state)
101101

102102
assert.NoError(t, err)
103103
assert.Len(t, requests, 18)
104-
assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize)
105-
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
106-
assert.Equal(t, 36, state.nextRequestBufferSize)
104+
assert.Equal(t, int64(len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize.Load())
105+
assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load())
106+
assert.Equal(t, int64(36), state.nextRequestBufferSize.Load())
107107
}
108108

109109
// Benchmark_batchTimeSeries checks batchTimeSeries
@@ -132,7 +132,7 @@ func Benchmark_batchTimeSeries(b *testing.B) {
132132
state := newBatchTimeSericesState()
133133
// Run batchTimeSeries 100 times with a 1mb max request size
134134
for i := 0; i < b.N; i++ {
135-
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
135+
requests, err := batchTimeSeries(tsMap1, 1000000, nil, state)
136136
assert.NoError(b, err)
137137
assert.Len(b, requests, 18)
138138
}

0 commit comments

Comments
 (0)