Skip to content

[processor/memory_limiter] Update config validation #9059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/memory-limiter-update-config-validation-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor/memory_limiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update config validation errors

# One or more tracking issues or pull requests related to the change
issues: [9059]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Fix names of the config fields that are validated in the error messages
- Move the validation from start to the initialization phrase

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 15 additions & 0 deletions processor/memorylimiterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,20 @@ var _ component.Config = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 {
return errPercentageLimitOutOfRange
}
if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB {
return errMemSpikeLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage {
return errMemSpikePercentageLimitOutOfRange
}
return nil
}
68 changes: 68 additions & 0 deletions processor/memorylimiterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,71 @@ func TestUnmarshalConfig(t *testing.T) {
MemorySpikeLimitMiB: 500,
}, cfg)
}

func TestConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg *Config
err error
}{
{
name: "valid",
cfg: func() *Config {
cfg := createDefaultConfig().(*Config)
cfg.MemoryLimitMiB = 5722
cfg.MemorySpikeLimitMiB = 1907
cfg.CheckInterval = 100 * time.Millisecond
return cfg
}(),
err: nil,
},
{
name: "zero check interval",
cfg: &Config{
CheckInterval: 0,
},
err: errCheckIntervalOutOfRange,
},
{
name: "unset memory limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 0,
MemoryLimitPercentage: 0,
},
err: errLimitOutOfRange,
},
{
name: "invalid memory spike limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 10,
MemorySpikeLimitMiB: 10,
},
err: errMemSpikeLimitOutOfRange,
},
{
name: "invalid memory percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 101,
},
err: errPercentageLimitOutOfRange,
},
{
name: "invalid memory spike percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 60,
},
err: errMemSpikePercentageLimitOutOfRange,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
assert.Equal(t, tt.err, err)
})
}
}
19 changes: 3 additions & 16 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,25 @@ func TestCreateProcessor(t *testing.T) {

cfg := factory.CreateDefaultConfig()

// This processor can't be created with the default config.
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, tp)
assert.Error(t, err, "created processor with invalid settings")

mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, mp)
assert.Error(t, err, "created processor with invalid settings")

lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, lp)
assert.Error(t, err, "created processor with invalid settings")

// Create processor with a valid config.
pCfg := cfg.(*Config)
pCfg.MemoryLimitMiB = 5722
pCfg.MemorySpikeLimitMiB = 1907
pCfg.CheckInterval = 100 * time.Millisecond

tp, err = factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, tp)
// test if we can shutdown a monitoring routine that has not started
assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted)
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))

mp, err = factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, mp)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

lp, err = factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, lp)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
38 changes: 12 additions & 26 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ var (

// Construction errors

errCheckIntervalOutOfRange = errors.New(
"checkInterval must be greater than zero")
errCheckIntervalOutOfRange = errors.New("check_interval must be greater than zero")

errLimitOutOfRange = errors.New(
"memAllocLimit or memoryLimitPercentage must be greater than zero")
errLimitOutOfRange = errors.New("limit_mib or limit_percentage must be greater than zero")

errMemSpikeLimitOutOfRange = errors.New(
"memSpikeLimit must be smaller than memAllocLimit")
errMemSpikeLimitOutOfRange = errors.New("spike_limit_mib must be smaller than limit_mib")

errMemSpikePercentageLimitOutOfRange = errors.New("spike_limit_percentage must be smaller than limit_percentage")

errPercentageLimitOutOfRange = errors.New(
"memoryLimitPercentage and memorySpikePercentage must be greater than zero and less than or equal to hundred",
)
"limit_percentage and spike_limit_percentage must be greater than zero and less than or equal to hundred")
Comment on lines -38 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move them to config.go?


errShutdownNotStarted = errors.New("no existing monitoring routine is running")
)
Expand Down Expand Up @@ -86,13 +84,6 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second

// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter, error) {
if cfg.CheckInterval <= 0 {
return nil, errCheckIntervalOutOfRange
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return nil, errLimitOutOfRange
}

logger := set.Logger
usageChecker, err := getMemUsageChecker(cfg, logger)
if err != nil {
Expand Down Expand Up @@ -129,7 +120,7 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
if cfg.MemoryLimitMiB != 0 {
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit)
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
}
totalMemory, err := getMemoryFn()
if err != nil {
Expand All @@ -139,7 +130,8 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
uint64(cfg.MemorySpikePercentage)), nil
}

func (ml *memoryLimiter) start(_ context.Context, host component.Host) error {
Expand Down Expand Up @@ -319,23 +311,17 @@ func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit
}

func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) {
if memSpikeLimit >= memAllocLimit {
return nil, errMemSpikeLimitOutOfRange
}
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker {
if memSpikeLimit == 0 {
// If spike limit is unspecified use 20% of mem limit.
memSpikeLimit = memAllocLimit / 5
}
return &memUsageChecker{
memAllocLimit: memAllocLimit,
memSpikeLimit: memSpikeLimit,
}, nil
}
}

func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) (*memUsageChecker, error) {
if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 {
return nil, errPercentageLimitOutOfRange
}
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker {
return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100)
}
91 changes: 3 additions & 88 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/iruntime"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -29,71 +28,6 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

func TestNew(t *testing.T) {
type args struct {
nextConsumer consumer.Traces
checkInterval time.Duration
memoryLimitMiB uint32
memorySpikeLimitMiB uint32
}
sink := new(consumertest.TracesSink)
tests := []struct {
name string
args args
wantErr error
}{
{
name: "zero_checkInterval",
args: args{
nextConsumer: sink,
},
wantErr: errCheckIntervalOutOfRange,
},
{
name: "zero_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
},
wantErr: errLimitOutOfRange,
},
{
name: "memSpikeLimit_gt_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memoryLimitMiB: 1,
memorySpikeLimitMiB: 2,
},
wantErr: errMemSpikeLimitOutOfRange,
},
{
name: "success",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memoryLimitMiB: 1024,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.CheckInterval = tt.args.checkInterval
cfg.MemoryLimitMiB = tt.args.memoryLimitMiB
cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB
got, err := newMemoryLimiter(processortest.NewNopCreateSettings(), cfg)
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr)
return
}
assert.NoError(t, err)
assert.NoError(t, got.start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, got.shutdown(context.Background()))
})
}
}

// TestMetricsMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestMetricsMemoryPressureResponse(t *testing.T) {
Expand Down Expand Up @@ -309,11 +243,6 @@ func TestGetDecision(t *testing.T) {
memSpikeLimit: 20 * mibBytes,
}, d)
})
t.Run("fixed_limit_error", func(t *testing.T) {
d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
})

t.Cleanup(func() {
getMemoryFn = iruntime.TotalMemory
Expand All @@ -329,26 +258,12 @@ func TestGetDecision(t *testing.T) {
memSpikeLimit: 10 * mibBytes,
}, d)
})
t.Run("percentage_limit_error", func(t *testing.T) {
d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
})
}

func TestRefuseDecision(t *testing.T) {
decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30)
require.NoError(t, err)
decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50)
require.NoError(t, err)
decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20)
require.NoError(t, err)
decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60)
require.Error(t, err)
assert.Nil(t, decison1000Limit40Spike60)
decison1000Limit30Spike30 := newPercentageMemUsageChecker(1000, 60, 30)
decison1000Limit60Spike50 := newPercentageMemUsageChecker(1000, 60, 50)
decison1000Limit40Spike20 := newPercentageMemUsageChecker(1000, 40, 20)

tests := []struct {
name string
Expand Down