Skip to content

Commit b9b6e90

Browse files
authored
[chore][pkg/fileconsumer] - Move archive into separate package (#39353)
#### Description Move archive into a separate package to make code more testable and less complicated overall. This also changes function `instantiateTracker` to `instantiateTrackerAndArchive` a little bit to initialize the archive and pass it to the tracker for writing older offsets. This PR also drops some of the edge cases to reduce the complexity. I'll work on them later <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Relates #38056
1 parent 521d9e5 commit b9b6e90

File tree

6 files changed

+307
-436
lines changed

6 files changed

+307
-436
lines changed

pkg/stanza/fileconsumer/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (m *Manager) poll(ctx context.Context) {
150150
}
151151
}
152152
// rotate at end of every poll()
153-
m.tracker.EndPoll()
153+
m.tracker.EndPoll(ctx)
154154
}
155155

156156
func (m *Manager) consume(ctx context.Context, paths []string) {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package archive // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive"
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
12+
"go.opentelemetry.io/collector/extension/xextension/storage"
13+
"go.uber.org/zap"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
20+
)
21+
22+
const (
23+
archiveIndexKey = "knownFilesArchiveIndex"
24+
archivePollsToArchiveKey = "knonwFilesPollsToArchive"
25+
)
26+
27+
type Archive interface {
28+
FindFiles(context.Context, []*fingerprint.Fingerprint) []*reader.Metadata
29+
WriteFiles(context.Context, *fileset.Fileset[*reader.Metadata])
30+
}
31+
32+
func New(ctx context.Context, logger *zap.Logger, pollsToArchive int, persister operator.Persister) Archive {
33+
if pollsToArchive <= 0 || persister == nil {
34+
logger.Debug("archiving is disabled. enable pollsToArchive and storage settings to save offsets on disk.")
35+
return &nopArchive{}
36+
}
37+
38+
// restore last known archive index
39+
archiveIndex, err := getArchiveIndex(ctx, persister)
40+
switch {
41+
case err != nil:
42+
logger.Error("failed to read archive index. Resetting it to 0", zap.Error(err))
43+
archiveIndex = 0
44+
case archiveIndex >= pollsToArchive:
45+
logger.Warn("archiveIndex is out of bounds, likely due to change in pollsToArchive. Resetting it to 0") // Try to craft log to explain in user facing terms?
46+
archiveIndex = 0
47+
default:
48+
// archiveIndex should point to index for the next write, hence increment it from last known value.
49+
archiveIndex = (archiveIndex + 1) % pollsToArchive
50+
}
51+
return &archive{
52+
pollsToArchive: pollsToArchive,
53+
persister: persister,
54+
archiveIndex: archiveIndex,
55+
logger: logger,
56+
}
57+
}
58+
59+
type archive struct {
60+
persister operator.Persister
61+
62+
pollsToArchive int
63+
64+
// archiveIndex points to the index for the next write.
65+
archiveIndex int
66+
logger *zap.Logger
67+
}
68+
69+
func (a *archive) FindFiles(ctx context.Context, fps []*fingerprint.Fingerprint) []*reader.Metadata {
70+
// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
71+
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.
72+
// We exit if all fingerprints are matched.
73+
74+
// Track number of matched fingerprints so we can exit if all matched.
75+
var numMatched int
76+
77+
// Determine the index for reading archive, starting from the most recent and moving towards the oldest
78+
nextIndex := a.archiveIndex
79+
matchedMetadata := make([]*reader.Metadata, len(fps))
80+
81+
// continue executing the loop until either all records are matched or all archive sets have been processed.
82+
for i := 0; i < a.pollsToArchive; i++ {
83+
// Update the mostRecentIndex
84+
nextIndex = (nextIndex - 1 + a.pollsToArchive) % a.pollsToArchive
85+
86+
data, err := a.readArchive(ctx, nextIndex) // we load one fileset atmost once per poll
87+
if err != nil {
88+
a.logger.Error("failed to read archive", zap.Error(err))
89+
continue
90+
}
91+
archiveModified := false
92+
for j, fp := range fps {
93+
if matchedMetadata[j] != nil {
94+
// we've already found a match for this index, continue
95+
continue
96+
}
97+
if md := data.Match(fp, fileset.StartsWith); md != nil {
98+
// update the matched metada for the index
99+
matchedMetadata[j] = md
100+
archiveModified = true
101+
numMatched++
102+
}
103+
}
104+
if !archiveModified {
105+
continue
106+
}
107+
// we save one fileset atmost once per poll
108+
if err := a.writeArchive(ctx, nextIndex, data); err != nil {
109+
a.logger.Error("failed to write archive", zap.Error(err))
110+
}
111+
// Check if all metadata have been found
112+
if numMatched == len(fps) {
113+
return matchedMetadata
114+
}
115+
}
116+
return matchedMetadata
117+
}
118+
119+
func (a *archive) WriteFiles(ctx context.Context, metadata *fileset.Fileset[*reader.Metadata]) {
120+
// We make use of a ring buffer, where each set of files is stored under a specific index.
121+
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
122+
// Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0
123+
124+
// Archiving: ┌─────────────────────on-disk archive─────────────────────────┐
125+
// | ┌───┐ ┌───┐ ┌──────────────────┐ |
126+
// index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ |
127+
// | ▲ └───┘ └───┘ └──────────────────┘ |
128+
// | ▲ ▲ ▼ |
129+
// | ▲ │ Roll over overriting older offsets, if any ◀ |
130+
// └──────│──────────────────────────────────────────────────────┘
131+
// │
132+
// │
133+
// │
134+
// start
135+
// index
136+
137+
var buf bytes.Buffer
138+
if err := json.NewEncoder(&buf).Encode(a.archiveIndex); err != nil {
139+
a.logger.Error("failed to encode archive index", zap.Error(err))
140+
}
141+
indexOp := storage.SetOperation(archiveIndexKey, buf.Bytes()) // batch the updated index with metadata
142+
if err := a.writeArchive(ctx, a.archiveIndex, metadata, indexOp); err != nil {
143+
a.logger.Error("failed to write archive", zap.Error(err))
144+
}
145+
a.archiveIndex = (a.archiveIndex + 1) % a.pollsToArchive
146+
}
147+
148+
func (a *archive) readArchive(ctx context.Context, index int) (*fileset.Fileset[*reader.Metadata], error) {
149+
// readArchive loads data from the archive for a given index and returns a fileset.Filset.
150+
metadata, err := checkpoint.LoadKey(ctx, a.persister, archiveKey(index))
151+
if err != nil {
152+
return nil, err
153+
}
154+
f := fileset.New[*reader.Metadata](len(metadata))
155+
f.Add(metadata...)
156+
return f, nil
157+
}
158+
159+
func (a *archive) writeArchive(ctx context.Context, index int, rmds *fileset.Fileset[*reader.Metadata], ops ...*storage.Operation) error {
160+
// writeArchive saves data to the archive for a given index and returns an error, if encountered.
161+
return checkpoint.SaveKey(ctx, a.persister, rmds.Get(), archiveKey(index), ops...)
162+
}
163+
164+
func getArchiveIndex(ctx context.Context, persister operator.Persister) (int, error) {
165+
byteIndex, err := persister.Get(ctx, archiveIndexKey)
166+
if err != nil {
167+
return 0, err
168+
}
169+
var archiveIndex int
170+
if err := json.NewDecoder(bytes.NewReader(byteIndex)).Decode(&archiveIndex); err != nil {
171+
return 0, err
172+
}
173+
return archiveIndex, nil
174+
}
175+
176+
func archiveKey(i int) string {
177+
return fmt.Sprintf("knownFiles%d", i)
178+
}
179+
180+
type nopArchive struct{}
181+
182+
func (*nopArchive) FindFiles(_ context.Context, fps []*fingerprint.Fingerprint) []*reader.Metadata {
183+
// we return an array of "nil"s, indicating 0 matches are found in archive
184+
return make([]*reader.Metadata, len(fps))
185+
}
186+
187+
func (*nopArchive) WriteFiles(context.Context, *fileset.Fileset[*reader.Metadata]) {
188+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package archive_test // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive_test"
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
"go.uber.org/zap"
12+
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
18+
)
19+
20+
func TestArchiveNoRollover(t *testing.T) {
21+
persister := testutil.NewUnscopedMockPersister()
22+
a := archive.New(context.Background(), zap.L(), 3, persister)
23+
24+
fp1 := fingerprint.New([]byte("fp1"))
25+
fp2 := fingerprint.New([]byte("fp2"))
26+
fp3 := fingerprint.New([]byte("fp3"))
27+
28+
// Simulate three consecutive poll cycles
29+
a.WriteFiles(context.Background(), getFileset(fp1))
30+
a.WriteFiles(context.Background(), getFileset(fp2))
31+
a.WriteFiles(context.Background(), getFileset(fp3))
32+
33+
// All three fingerprints should still be present in the archive
34+
fp3Modified := fingerprint.New([]byte("fp3...."))
35+
foundMetadata := a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp3Modified})
36+
require.True(t, fp3.Equal(foundMetadata[0].GetFingerprint()), "Expected fp3 to match")
37+
38+
fp2Modified := fingerprint.New([]byte("fp2...."))
39+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp2Modified})
40+
require.True(t, fp2.Equal(foundMetadata[0].GetFingerprint()), "Expected fp2 to match")
41+
42+
fp1Modified := fingerprint.New([]byte("fp1...."))
43+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp1Modified})
44+
require.True(t, fp1.Equal(foundMetadata[0].GetFingerprint()), "Expected fp1 to match")
45+
}
46+
47+
func TestArchiveRollOver(t *testing.T) {
48+
persister := testutil.NewUnscopedMockPersister()
49+
a := archive.New(context.Background(), zap.L(), 3, persister)
50+
51+
fp1 := fingerprint.New([]byte("fp1"))
52+
fp2 := fingerprint.New([]byte("fp2"))
53+
fp3 := fingerprint.New([]byte("fp3"))
54+
fp4 := fingerprint.New([]byte("fp4"))
55+
56+
// Simulate four consecutive poll cycles
57+
a.WriteFiles(context.Background(), getFileset(fp1))
58+
a.WriteFiles(context.Background(), getFileset(fp2))
59+
a.WriteFiles(context.Background(), getFileset(fp3))
60+
a.WriteFiles(context.Background(), getFileset(fp4)) // This should evice fp1
61+
62+
// The archive should now contain fp2, fp3, and fp4
63+
fp4Modified := fingerprint.New([]byte("fp4...."))
64+
foundMetadata := a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp4Modified})
65+
require.Len(t, foundMetadata, 1)
66+
require.True(t, fp4.Equal(foundMetadata[0].GetFingerprint()), "Expected fp4 to match")
67+
68+
fp3Modified := fingerprint.New([]byte("fp3...."))
69+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp3Modified})
70+
require.Len(t, foundMetadata, 1)
71+
require.True(t, fp3.Equal(foundMetadata[0].GetFingerprint()), "Expected fp3 to match")
72+
73+
fp2Modified := fingerprint.New([]byte("fp2...."))
74+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp2Modified})
75+
require.Len(t, foundMetadata, 1)
76+
require.True(t, fp2.Equal(foundMetadata[0].GetFingerprint()), "Expected fp2 to match")
77+
78+
// fp1 should have been evicted and thus not retrievable
79+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp1})
80+
require.Nil(t, foundMetadata[0], "Expected fp1 to be evicted from archive")
81+
}
82+
83+
func TestNopArchive(t *testing.T) {
84+
a := archive.New(context.Background(), zap.L(), 3, nil)
85+
86+
fp1 := fingerprint.New([]byte("fp1"))
87+
fp2 := fingerprint.New([]byte("fp2"))
88+
fp3 := fingerprint.New([]byte("fp3"))
89+
90+
// Simulate three consecutive poll cycles
91+
a.WriteFiles(context.Background(), getFileset(fp1))
92+
a.WriteFiles(context.Background(), getFileset(fp2))
93+
a.WriteFiles(context.Background(), getFileset(fp3))
94+
95+
// All three fingerprints should not be present in the archive
96+
foundMetadata := a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp3})
97+
require.Nil(t, foundMetadata[0], "fingerprint should not be in nopArchive")
98+
99+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp2})
100+
require.Nil(t, foundMetadata[0], "fingerprint should not be in nopArchive")
101+
102+
foundMetadata = a.FindFiles(context.Background(), []*fingerprint.Fingerprint{fp1})
103+
require.Nil(t, foundMetadata[0], "fingerprint should not be in nopArchive")
104+
}
105+
106+
func getFileset(fp *fingerprint.Fingerprint) *fileset.Fileset[*reader.Metadata] {
107+
set := fileset.New[*reader.Metadata](0)
108+
set.Add(&reader.Metadata{Fingerprint: fp})
109+
return set
110+
}

0 commit comments

Comments
 (0)