Skip to content

[processor/batchprocessor] Improve batch processor edge case performance #13272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions processor/batchprocessor/splitlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

// splitLogs removes logrecords from the input data and returns a new data of the specified size.
func splitLogs(size int, src plog.Logs) plog.Logs {
if src.LogRecordCount() <= size {
if !hasAtLeastNRecords(src, size+1) {
return src
}
totalCopiedLogRecords := 0
Expand All @@ -22,8 +22,8 @@
}

// If it fully fits
srcRlLRC := resourceLRC(srcRl)
if (totalCopiedLogRecords + srcRlLRC) <= size {
wontFit, srcRlLRC := resourceHasAtLeastNRecords(srcRl, size-totalCopiedLogRecords+1)
if !wontFit {
totalCopiedLogRecords += srcRlLRC
srcRl.MoveTo(dest.ResourceLogs().AppendEmpty())
return true
Expand Down Expand Up @@ -64,10 +64,33 @@
return dest
}

// resourceLRC calculates the total number of log records in the plog.ResourceLogs.
func resourceLRC(rs plog.ResourceLogs) (count int) {
func resourceHasAtLeastNRecords(rs plog.ResourceLogs, n int) (bool, int) {
count := 0
for k := 0; k < rs.ScopeLogs().Len(); k++ {
count += rs.ScopeLogs().At(k).LogRecords().Len()
if count >= n {
return true, 0
}
}
if count >= n {
return true, 0
}

Check warning on line 77 in processor/batchprocessor/splitlogs.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/splitlogs.go#L76-L77

Added lines #L76 - L77 were not covered by tests
return false, count
}

func hasAtLeastNRecords(ms plog.Logs, n int) bool {
logCount := 0
rss := ms.ResourceLogs()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
ill := rs.ScopeLogs()
for i := 0; i < ill.Len(); i++ {
logs := ill.At(i)
logCount += logs.LogRecords().Len()
if logCount >= n {
return true
}
}
}
return
return logCount >= n
}
68 changes: 55 additions & 13 deletions processor/batchprocessor/splitmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pmetric.Metrics) pmetric.Metrics {
dataPoints := src.DataPointCount()
if dataPoints <= size {
if !hasAtLeastNDataPoints(&src, size+1) {
return src
}
totalCopiedDataPoints := 0
Expand All @@ -23,8 +22,8 @@
}

// If it fully fits
srcRsDataPointCount := resourceMetricsDPC(srcRs)
if (totalCopiedDataPoints + srcRsDataPointCount) <= size {
wontFit, srcRsDataPointCount := resourceMetricsHasAtLeastNDataPoints(srcRs, size-totalCopiedDataPoints+1)
if !wontFit {
totalCopiedDataPoints += srcRsDataPointCount
srcRs.MoveTo(dest.ResourceMetrics().AppendEmpty())
return true
Expand All @@ -39,8 +38,8 @@
}

// If possible to move all metrics do that.
srcIlmDataPointCount := scopeMetricsDPC(srcIlm)
if srcIlmDataPointCount+totalCopiedDataPoints <= size {
wontFit, srcIlmDataPointCount := scopeMetricsHasAtLeastNDataPoints(srcIlm, size-totalCopiedDataPoints+1)
if !wontFit {
totalCopiedDataPoints += srcIlmDataPointCount
srcIlm.MoveTo(destRs.ScopeMetrics().AppendEmpty())
return true
Expand Down Expand Up @@ -75,24 +74,35 @@
return dest
}

// resourceMetricsDPC calculates the total number of data points in the pmetric.ResourceMetrics.
func resourceMetricsDPC(rs pmetric.ResourceMetrics) int {
func resourceMetricsHasAtLeastNDataPoints(rs pmetric.ResourceMetrics, n int) (bool, int) {
dataPointCount := 0
ilms := rs.ScopeMetrics()
for k := 0; k < ilms.Len(); k++ {
dataPointCount += scopeMetricsDPC(ilms.At(k))
hasAtLeastN, scopeMetricDPC := scopeMetricsHasAtLeastNDataPoints(ilms.At(k), n-dataPointCount+1)
if hasAtLeastN {
return true, 0
}
dataPointCount += scopeMetricDPC
}
if dataPointCount >= n {
return true, 0
}
return dataPointCount
return false, dataPointCount
}

// scopeMetricsDPC calculates the total number of data points in the pmetric.ScopeMetrics.
func scopeMetricsDPC(ilm pmetric.ScopeMetrics) int {
func scopeMetricsHasAtLeastNDataPoints(ilm pmetric.ScopeMetrics, n int) (bool, int) {
dataPointCount := 0
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDPC(ms.At(k))
if dataPointCount >= n {
return true, 0
}
}
if dataPointCount >= n {
return true, 0

Check warning on line 103 in processor/batchprocessor/splitmetrics.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/splitmetrics.go#L103

Added line #L103 was not covered by tests
}
return dataPointCount
return false, dataPointCount
}

// metricDPC calculates the total number of data points in the pmetric.Metric.
Expand Down Expand Up @@ -196,3 +206,35 @@
})
return size, false
}

func hasAtLeastNDataPoints(ms *pmetric.Metrics, n int) bool {
dataPointCount := 0
rms := ms.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
switch m.Type() {
case pmetric.MetricTypeGauge:
dataPointCount += m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
dataPointCount += m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
dataPointCount += m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
dataPointCount += m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
dataPointCount += m.Summary().DataPoints().Len()
}
if dataPointCount >= n {
return true
}
}
}
}
return dataPointCount >= n
}
34 changes: 34 additions & 0 deletions processor/batchprocessor/splitmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,37 @@ func TestSplitMetricsMultipleILM(t *testing.T) {
assert.Equal(t, "test-metric-int-0-0", split.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name())
assert.Equal(t, "test-metric-int-0-4", split.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Name())
}

func BenchmarkSplitMetrics1k(b *testing.B) {
benchmarkHelper(b, 1000)
}

func BenchmarkSplitMetrics16k(b *testing.B) {
benchmarkHelper(b, 16000)
}

func BenchmarkSplitMetrics64k(b *testing.B) {
benchmarkHelper(b, 64000)
}

func benchmarkHelper(b *testing.B, metricCount int) {
b.StopTimer()

md := testdata.GenerateMetrics(metricCount / 2)
// Make sure GenerateMetrics put two data points in each metric.
assert.Equal(b, md.DataPointCount(), metricCount)

for i := 0; i < b.N; i++ {
mdClone := pmetric.NewMetrics()
md.CopyTo(mdClone)

splitSize := 1
remainingDataPoints := metricCount
b.StartTimer()
for remainingDataPoints > 0 {
_ = splitMetrics(splitSize, mdClone)
remainingDataPoints -= splitSize
}
b.StopTimer()
}
}
34 changes: 28 additions & 6 deletions processor/batchprocessor/splittraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

// splitTraces removes spans from the input trace and returns a new trace of the specified size.
func splitTraces(size int, src ptrace.Traces) ptrace.Traces {
if src.SpanCount() <= size {
if !hasAtLeastNSpans(src, size+1) {
return src
}
totalCopiedSpans := 0
Expand All @@ -22,8 +22,8 @@
}

// If it fully fits
srcRsSC := resourceSC(srcRs)
if (totalCopiedSpans + srcRsSC) <= size {
wontFit, srcRsSC := resourceHasAtLeastNSpans(srcRs, size-totalCopiedSpans+1)
if !wontFit {
totalCopiedSpans += srcRsSC
srcRs.MoveTo(dest.ResourceSpans().AppendEmpty())
return true
Expand Down Expand Up @@ -64,10 +64,32 @@
return dest
}

// resourceSC calculates the total number of spans in the ptrace.ResourceSpans.
func resourceSC(rs ptrace.ResourceSpans) (count int) {
func resourceHasAtLeastNSpans(rs ptrace.ResourceSpans, n int) (bool, int) {
count := 0
for k := 0; k < rs.ScopeSpans().Len(); k++ {
count += rs.ScopeSpans().At(k).Spans().Len()
if count >= n {
return true, 0
}
}
if count >= n {
return true, 0
}

Check warning on line 77 in processor/batchprocessor/splittraces.go

View check run for this annotation

Codecov / codecov/patch

processor/batchprocessor/splittraces.go#L76-L77

Added lines #L76 - L77 were not covered by tests
return false, count
}

func hasAtLeastNSpans(ms ptrace.Traces, n int) bool {
spanCount := 0
rss := ms.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
spanCount += ilss.At(j).Spans().Len()
if spanCount >= n {
return true
}
}
}
return
return spanCount >= n
}
Loading