Skip to content

Commit 3fc1445

Browse files
move clean_up from client to extension level
Each extension instance can create multiple clients. Moving the logic to extension start is better aligned with the extension lifecycle and prevent potential collisions, all clients will share the same compaction folder already cleaned up.
1 parent 9a4f1e8 commit 3fc1445

File tree

4 files changed

+39
-63
lines changed

4 files changed

+39
-63
lines changed

extension/storage/filestorage/client.go

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"errors"
99
"fmt"
1010
"os"
11-
"path/filepath"
1211
"sync"
1312
"syscall"
1413
"time"
@@ -21,12 +20,13 @@ import (
2120
var defaultBucket = []byte(`default`)
2221

2322
const (
23+
TempDbPrefix = "tempdb"
24+
2425
elapsedKey = "elapsed"
2526
directoryKey = "directory"
2627
tempDirectoryKey = "tempDirectory"
2728

28-
tempDbPrefix = "tempdb"
29-
oneMiB = 1048576
29+
oneMiB = 1048576
3030
)
3131

3232
type fileStorageClient struct {
@@ -65,9 +65,6 @@ func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compa
6565
}
6666

6767
client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout}
68-
if compactionCfg.CleanupOnStart {
69-
client.cleanup(compactionCfg.Directory)
70-
}
7168
if compactionCfg.OnRebound {
7269
client.startCompactionLoop(context.Background())
7370
}
@@ -157,7 +154,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
157154
var compactedDb *bbolt.DB
158155

159156
// create temporary file in compactionDirectory
160-
file, err = os.CreateTemp(compactionDirectory, tempDbPrefix)
157+
file, err = os.CreateTemp(compactionDirectory, TempDbPrefix)
161158
if err != nil {
162159
return err
163160
}
@@ -347,30 +344,3 @@ func moveFileWithFallback(src string, dest string) error {
347344
err = os.Remove(src)
348345
return err
349346
}
350-
351-
// cleanup left compaction temporary files from previous killed process
352-
func (c *fileStorageClient) cleanup(compactionDirectory string) error {
353-
pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", tempDbPrefix))
354-
contents, err := filepath.Glob(pattern)
355-
if err != nil {
356-
c.logger.Info("cleanup error listing temporary files",
357-
zap.Error(err))
358-
return err
359-
}
360-
361-
var errs []error
362-
for _, item := range contents {
363-
err = os.Remove(item)
364-
if err == nil {
365-
c.logger.Debug("cleanup",
366-
zap.String("deletedFile", item))
367-
} else {
368-
errs = append(errs, err)
369-
}
370-
}
371-
if errs != nil {
372-
c.logger.Info("cleanup errors",
373-
zap.Error(errors.Join(errs...)))
374-
}
375-
return nil
376-
}

extension/storage/filestorage/client_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -404,34 +404,6 @@ func TestClientConcurrentCompaction(t *testing.T) {
404404
}
405405
}
406406

407-
func TestClientCleanupOnStart(t *testing.T) {
408-
tempDir := t.TempDir()
409-
dbFile := filepath.Join(tempDir, "my_db")
410-
temp, _ := os.CreateTemp(tempDir, tempDbPrefix)
411-
// simulate ongoing compaction in another instance
412-
tempLocked, _ := os.CreateTemp(tempDir, tempDbPrefix)
413-
temp.Close()
414-
415-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{
416-
Directory: tempDir,
417-
CleanupOnStart: true,
418-
}, false)
419-
require.NoError(t, err)
420-
421-
t.Cleanup(func() {
422-
require.NoError(t, client.Close(context.TODO()))
423-
tempLocked.Close()
424-
})
425-
426-
// check if cleanup removed the unlocked file and left db and locked file
427-
files, err := os.ReadDir(tempDir)
428-
require.NoError(t, err)
429-
require.Equal(t, 2, len(files))
430-
require.Equal(t, "my_db", files[0].Name())
431-
_, f := filepath.Split(tempLocked.Name())
432-
require.Equal(t, f, files[1].Name())
433-
}
434-
435407
func BenchmarkClientGet(b *testing.B) {
436408
tempDir := b.TempDir()
437409
dbFile := filepath.Join(tempDir, "my_db")

extension/storage/filestorage/extension.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ package filestorage // import "github.com/open-telemetry/opentelemetry-collector
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
10+
"os"
911
"path/filepath"
1012
"strings"
1113

@@ -42,6 +44,9 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio
4244

4345
// Start does nothing
4446
func (lfs *localFileStorage) Start(context.Context, component.Host) error {
47+
if lfs.cfg.Compaction.CleanupOnStart {
48+
lfs.cleanup(lfs.cfg.Compaction.Directory)
49+
}
4550
return nil
4651
}
4752

@@ -135,3 +140,30 @@ func isSafe(character rune) bool {
135140
}
136141
return false
137142
}
143+
144+
// cleanup left compaction temporary files from previous killed process
145+
func (c *localFileStorage) cleanup(compactionDirectory string) error {
146+
pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", TempDbPrefix))
147+
contents, err := filepath.Glob(pattern)
148+
if err != nil {
149+
c.logger.Info("cleanup error listing temporary files",
150+
zap.Error(err))
151+
return err
152+
}
153+
154+
var errs []error
155+
for _, item := range contents {
156+
err = os.Remove(item)
157+
if err == nil {
158+
c.logger.Debug("cleanup",
159+
zap.String("deletedFile", item))
160+
} else {
161+
errs = append(errs, err)
162+
}
163+
}
164+
if errs != nil {
165+
c.logger.Info("cleanup errors",
166+
zap.Error(errors.Join(errs...)))
167+
}
168+
return nil
169+
}

extension/storage/filestorage/extension_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"go.opentelemetry.io/collector/component"
17+
"go.opentelemetry.io/collector/component/componenttest"
1718
"go.opentelemetry.io/collector/extension/experimental/storage"
1819
"go.opentelemetry.io/collector/extension/extensiontest"
1920
)
@@ -454,7 +455,7 @@ func TestCleanupOnStart(t *testing.T) {
454455

455456
tempDir := t.TempDir()
456457
// simulate left temporary compaction file from killed process
457-
temp, _ := os.CreateTemp(tempDir, tempDbPrefix)
458+
temp, _ := os.CreateTemp(tempDir, TempDbPrefix)
458459
temp.Close()
459460

460461
f := NewFactory()
@@ -467,6 +468,7 @@ func TestCleanupOnStart(t *testing.T) {
467468

468469
se, ok := extension.(storage.Extension)
469470
require.True(t, ok)
471+
require.NoError(t, se.Start(ctx, componenttest.NewNopHost()))
470472

471473
client, err := se.GetClient(
472474
ctx,

0 commit comments

Comments
 (0)