Skip to content

Commit c6cd1ae

Browse files
authored
[chore] Remove unused size backup logic for items sizer (#13043)
Updates #12890 This is safe and simplifies migration to the new proto metadata because we only allow request sizer for persistent queue, so size can always be determined from the indexes. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent f1ec370 commit c6cd1ae

File tree

2 files changed

+11
-69
lines changed

2 files changed

+11
-69
lines changed

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ const (
2828
readIndexKey = "ri"
2929
writeIndexKey = "wi"
3030
currentlyDispatchedItemsKey = "di"
31-
queueSizeKey = "si"
3231

3332
// queueMetadataKey is the new single key for all queue metadata.
3433
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
@@ -88,9 +87,6 @@ type persistentQueue[T any] struct {
8887
logger *zap.Logger
8988
client storage.Client
9089

91-
// isRequestSized indicates whether the queue is sized by the number of requests.
92-
isRequestSized bool
93-
9490
// mu guards everything declared below.
9591
mu sync.Mutex
9692
hasMoreElements *sync.Cond
@@ -102,11 +98,9 @@ type persistentQueue[T any] struct {
10298

10399
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
104100
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
105-
_, isRequestSized := set.sizer.(request.RequestsSizer[T])
106101
pq := &persistentQueue[T]{
107-
set: set,
108-
logger: set.telemetry.Logger,
109-
isRequestSized: isRequestSized,
102+
set: set,
103+
logger: set.telemetry.Logger,
110104
}
111105
pq.hasMoreElements = sync.NewCond(&pq.mu)
112106
pq.hasMoreSpace = newCond(&pq.mu)
@@ -165,33 +159,8 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
165159
pq.metadata.WriteIndex = 0
166160
}
167161

168-
queueSize := pq.metadata.WriteIndex - pq.metadata.ReadIndex
169-
170-
// If the queue is sized by the number of requests, no need to read the queue size from storage.
171-
if queueSize > 0 && !pq.isRequestSized {
172-
if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil {
173-
queueSize = restoredQueueSize
174-
}
175-
}
176162
//nolint:gosec
177-
pq.metadata.QueueSize = int64(queueSize)
178-
}
179-
180-
// restoreQueueSizeFromStorage restores the queue size from storage.
181-
func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) {
182-
val, err := pq.client.Get(ctx, queueSizeKey)
183-
if err != nil {
184-
if errors.Is(err, errValueNotSet) {
185-
pq.logger.Warn("Cannot read the queue size snapshot from storage. "+
186-
"The reported queue size will be inaccurate until the initial queue is drained. "+
187-
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
188-
} else {
189-
pq.logger.Error("Failed to read the queue size snapshot from storage. "+
190-
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))
191-
}
192-
return 0, err
193-
}
194-
return bytesToItemIndex(val)
163+
pq.metadata.QueueSize = int64(pq.metadata.WriteIndex - pq.metadata.ReadIndex)
195164
}
196165

197166
func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
@@ -202,24 +171,10 @@ func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
202171

203172
pq.mu.Lock()
204173
defer pq.mu.Unlock()
205-
backupErr := pq.backupQueueSize(ctx)
206174
// Mark this queue as stopped, so consumer don't start any more work.
207175
pq.stopped = true
208176
pq.hasMoreElements.Broadcast()
209-
return errors.Join(backupErr, pq.unrefClient(ctx))
210-
}
211-
212-
// backupQueueSize writes the current queue size to storage. The value is used to recover the queue size
213-
// in case if the collector is killed.
214-
func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error {
215-
// No need to write the queue size if the queue is sized by the number of requests.
216-
// That information is already stored as difference between read and write indexes.
217-
if pq.isRequestSized {
218-
return nil
219-
}
220-
221-
//nolint:gosec
222-
return pq.client.Set(ctx, queueSizeKey, itemIndexToBytes(uint64(pq.metadata.QueueSize)))
177+
return pq.unrefClient(ctx)
223178
}
224179

225180
// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
@@ -271,14 +226,6 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
271226
pq.metadata.QueueSize += reqSize
272227
pq.hasMoreElements.Signal()
273228

274-
// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
275-
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
276-
if (pq.metadata.WriteIndex % 10) == 5 {
277-
if err := pq.backupQueueSize(ctx); err != nil {
278-
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
279-
}
280-
}
281-
282229
return nil
283230
}
284231

@@ -298,13 +245,14 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
298245
// Ensure the used size and the channel size are in sync.
299246
if pq.metadata.ReadIndex == pq.metadata.WriteIndex {
300247
pq.metadata.QueueSize = 0
301-
pq.hasMoreSpace.Signal()
302248
}
303249
if consumed {
304250
id := indexDonePool.Get().(*indexDone)
305251
id.reset(index, pq.set.sizer.Sizeof(req), pq)
306252
return reqCtx, req, id, true
307253
}
254+
// More space available, data was dropped.
255+
pq.hasMoreSpace.Signal()
308256
}
309257

310258
// TODO: Need to change the Queue interface to return an error to allow distinguish between shutdown and context canceled.
@@ -369,7 +317,6 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
369317
if pq.metadata.QueueSize < 0 {
370318
pq.metadata.QueueSize = 0
371319
}
372-
pq.hasMoreSpace.Signal()
373320

374321
if experr.IsShutdownErr(consumeErr) {
375322
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
@@ -381,13 +328,8 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
381328
pq.logger.Error("Error deleting item from queue", zap.Error(err))
382329
}
383330

384-
// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
385-
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
386-
if (pq.metadata.ReadIndex % 10) == 0 {
387-
if qsErr := pq.backupQueueSize(context.Background()); qsErr != nil {
388-
pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr))
389-
}
390-
}
331+
// More space available after data are removed from the storage.
332+
pq.hasMoreSpace.Signal()
391333
}
392334

393335
// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage

exporter/exporterhelper/internal/queue/persistent_queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,7 @@ func createTestPersistentQueueWithItemsCapacity(tb testing.TB, ext storage.Exten
260260
return createTestPersistentQueueWithCapacityLimiter(tb, ext, &itemsSizer{}, capacity)
261261
}
262262

263-
func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer request.Sizer[uint64],
264-
capacity int64,
265-
) *persistentQueue[uint64] {
263+
func createTestPersistentQueueWithCapacityLimiter(tb testing.TB, ext storage.Extension, sizer request.Sizer[uint64], capacity int64) *persistentQueue[uint64] {
266264
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
267265
sizer: sizer,
268266
capacity: capacity,
@@ -1003,6 +1001,7 @@ func TestPersistentQueue_ItemDispatchingFinish_ErrorHandling(t *testing.T) {
10031001
}
10041002

10051003
func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) {
1004+
t.Skip("Restore when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done")
10061005
ext := storagetest.NewMockStorageExtension(nil)
10071006
pq := createTestPersistentQueueWithItemsCapacity(t, ext, 100)
10081007

@@ -1164,6 +1163,7 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) {
11641163
// This test covers the case when the persistent storage is recovered from a snapshot which has
11651164
// bigger value for the used size than the size of the actual items in the storage.
11661165
func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) {
1166+
t.Skip("Restore when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done")
11671167
ext := storagetest.NewMockStorageExtension(nil)
11681168
pq := createTestPersistentQueueWithItemsCapacity(t, ext, 1000)
11691169

0 commit comments

Comments
 (0)