Skip to content

Commit 7c4a971

Browse files
committed
remove archive restoration for now
1 parent da07df3 commit 7c4a971

File tree

2 files changed

+43
-80
lines changed

2 files changed

+43
-80
lines changed

otel.yml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
exporters:
2+
elasticsearch:
3+
endpoint: "https://1.2.3.4234"
4+
timeout: 10s
5+
sending_queue:
6+
enabled: true
7+
num_consumers: 2
8+
queue_size: 10
9+
batcher:
10+
enabled: true
11+
max_size_items: 1600
12+
# min_size_items: 0
13+
# receivers:
14+
15+
16+
17+
receivers:
18+
memcached:
19+
endpoint: "localhost:11211"
20+
collection_interval: 10s
21+
# transport: tcp
22+
otlp:
23+
protocols:
24+
http:
25+
endpoint: "localhost:4318"
26+
grpc:
27+
endpoint: "localhost:4317"
28+
29+
service:
30+
pipelines:
31+
metrics:
32+
receivers: [memcached]
33+
exporters: [elasticsearch]

pkg/stanza/fileconsumer/internal/archive/archive.go

Lines changed: 10 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -145,77 +145,20 @@ func (a *archive) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata
145145
return checkpoint.SaveKey(context.Background(), a.persister, rmds.Get(), archiveKey(index), ops...)
146146
}
147147

148-
func (a *archive) archiveEnabled() bool {
149-
return a.pollsToArchive > 0 && a.persister != nil
150-
}
151-
152148
func (a *archive) restoreArchiveIndex(ctx context.Context) {
153-
// remove extra "keys" once archive restoration is done
149+
var err error
150+
// remove extra "keys" in case `pollsToArchive` has changed between collector restarts
154151
defer a.removeExtraKeys(ctx)
155-
defer func() {
156-
// store current pollsToArchive
157-
if err := a.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(a.pollsToArchive)); err != nil {
158-
a.logger.Error("Error storing polls_to_archive", zap.Error(err))
159-
}
160-
}()
161-
162-
previousPollsToArchive, err := a.getPreviousPollsToArchive(ctx)
163-
if err != nil {
164-
// if there's an error reading previousPollsToArchive, default to current value
165-
previousPollsToArchive = a.pollsToArchive
166-
}
167152

168153
a.archiveIndex, err = a.getArchiveIndex(ctx)
169154
if err != nil {
170-
a.logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
171-
return
172-
}
173-
174-
if previousPollsToArchive < a.pollsToArchive {
175-
// if archive size has increased, we just increment the index until we enconter a nil value
176-
for a.archiveIndex < a.pollsToArchive && a.isSet(ctx, a.archiveIndex) {
177-
a.archiveIndex++
178-
}
179-
} else if previousPollsToArchive > a.pollsToArchive {
180-
// we will only attempt to rewrite archive if the archive size has shrunk
181-
a.logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive")
182-
a.rewriteArchive(ctx, previousPollsToArchive)
183-
}
184-
}
185-
186-
func (a *archive) rewriteArchive(ctx context.Context, previousPollsToArchive int) {
187-
// helper to rewrite data from oldIndex to newIndex
188-
rewrite := func(newIdx, oldIdex int) error {
189-
oldVal, err := a.persister.Get(ctx, archiveKey(oldIdex))
190-
if err != nil {
191-
return err
192-
}
193-
return a.persister.Set(ctx, archiveKey(newIdx), oldVal)
155+
a.logger.Error("error while fetching archive index", zap.Error(err))
156+
a.archiveIndex = 0
194157
}
195-
// Calculate the least recent index, w.r.t. new archive size
196-
197-
leastRecentIndex := mod(a.archiveIndex-a.pollsToArchive, previousPollsToArchive)
198-
199-
// Refer archive.md for the detailed design
200-
if mod(a.archiveIndex-1, previousPollsToArchive) > a.pollsToArchive {
201-
for i := 0; i < a.pollsToArchive; i++ {
202-
if err := rewrite(i, leastRecentIndex); err != nil {
203-
a.logger.Error("error while swapping archive", zap.Error(err))
204-
}
205-
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
206-
}
158+
if a.archiveIndex >= a.pollsToArchive || a.archiveIndex < 0 {
159+
// archiveIndex is out of bounds. This most likely happened if `pollsToArchive` changed between collector restarts
160+
// we just set archiveIndex to 0 in this case i.e. to reboot the archive index
207161
a.archiveIndex = 0
208-
} else {
209-
if !a.isSet(ctx, a.archiveIndex) {
210-
// If the current index points at an unset key, no need to do anything
211-
return
212-
}
213-
for i := 0; i < a.pollsToArchive-a.archiveIndex; i++ {
214-
if err := rewrite(a.archiveIndex+i, leastRecentIndex); err != nil {
215-
a.logger.Warn("error while swapping archive", zap.Error(err))
216-
}
217-
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
218-
}
219162
}
220163
}
221164

@@ -227,20 +170,6 @@ func (a *archive) removeExtraKeys(ctx context.Context) {
227170
}
228171
}
229172

230-
func (a *archive) getPreviousPollsToArchive(ctx context.Context) (int, error) {
231-
byteIndex, err := a.persister.Get(ctx, archivePollsToArchiveKey)
232-
if err != nil {
233-
a.logger.Error("error while reading the archiveIndexKey", zap.Error(err))
234-
return 0, err
235-
}
236-
previousPollsToArchive, err := decodeIndex(byteIndex)
237-
if err != nil {
238-
a.logger.Error("error while decoding previousPollsToArchive", zap.Error(err))
239-
return 0, err
240-
}
241-
return previousPollsToArchive, nil
242-
}
243-
244173
func (a *archive) getArchiveIndex(ctx context.Context) (int, error) {
245174
byteIndex, err := a.persister.Get(ctx, archiveIndexKey)
246175
if err != nil {
@@ -253,8 +182,9 @@ func (a *archive) getArchiveIndex(ctx context.Context) (int, error) {
253182
return archiveIndex, nil
254183
}
255184

256-
func (a *archive) isSet(ctx context.Context, index int) bool {
257-
val, err := a.persister.Get(ctx, archiveKey(index))
185+
// isSet returns true of index `i` is set in the archive ring buffer
186+
func (a *archive) isSet(ctx context.Context, i int) bool {
187+
val, err := a.persister.Get(ctx, archiveKey(i))
258188
return val != nil && err == nil
259189
}
260190

0 commit comments

Comments
 (0)