Skip to content

Commit 51b99a0

Browse files
songy23sbylica-splunk
authored andcommitted
Revert "[exporter/awsemfexporter]Split EMF log with larger than 100 buckets." (open-telemetry#36763)
Reverts open-telemetry#36336 leads to test failures, see open-telemetry#36727
1 parent 15e3d1f commit 51b99a0

File tree

6 files changed

+47
-871
lines changed

6 files changed

+47
-871
lines changed

.chloggen/split-emf-log-when-buckets-larger-than-100.yaml

Lines changed: 0 additions & 27 deletions
This file was deleted.

exporter/awsemfexporter/datapoint.go

Lines changed: 42 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -109,33 +109,6 @@ type summaryMetricEntry struct {
109109
count uint64
110110
}
111111

112-
// dataPointSplit is a structure used to manage segments of data points split from a histogram.
113-
// It is not safe for concurrent use.
114-
type dataPointSplit struct {
115-
cWMetricHistogram *cWMetricHistogram
116-
length int
117-
capacity int
118-
}
119-
120-
func (split *dataPointSplit) isFull() bool {
121-
return split.length >= split.capacity
122-
}
123-
124-
func (split *dataPointSplit) setMax(maxVal float64) {
125-
split.cWMetricHistogram.Max = maxVal
126-
}
127-
128-
func (split *dataPointSplit) setMin(minVal float64) {
129-
split.cWMetricHistogram.Min = minVal
130-
}
131-
132-
func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
133-
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
134-
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
135-
split.length++
136-
split.cWMetricHistogram.Count += count
137-
}
138-
139112
// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
140113
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
141114
metric := dps.NumberDataPointSlice.At(i)
@@ -220,195 +193,85 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
220193
}
221194

222195
// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
223-
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
224-
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
225-
// Note that the number of values and counts in each split may not be less than splitThreshold as we are only adding non-zero bucket counts.
226-
//
227-
// For each split data point:
228-
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
229-
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
230-
// - Count is accumulated based on the bucket counts within each split.
231196
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
232197
metric := dps.ExponentialHistogramDataPointSlice.At(idx)
233198

234-
const splitThreshold = 100
235-
currentBucketIndex := 0
236-
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
237-
currentZeroIndex := 0
238-
currentNegativeIndex := 0
239-
var datapoints []dataPoint
240-
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
241-
if metric.ZeroCount() > 0 {
242-
totalBucketLen++
243-
}
244-
245-
for currentBucketIndex < totalBucketLen {
246-
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
247-
capacity := min(splitThreshold, totalBucketLen-currentBucketIndex)
248-
249-
sum := 0.0
250-
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
251-
if currentBucketIndex == 0 {
252-
sum = metric.Sum()
253-
}
254-
255-
split := dataPointSplit{
256-
cWMetricHistogram: &cWMetricHistogram{
257-
Values: []float64{},
258-
Counts: []float64{},
259-
Max: metric.Max(),
260-
Min: metric.Min(),
261-
Count: 0,
262-
Sum: sum,
263-
},
264-
length: 0,
265-
capacity: capacity,
266-
}
267-
268-
// Set collect values from positive buckets and save into split.
269-
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
270-
// Set collect values from zero buckets and save into split.
271-
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
272-
// Set collect values from negative buckets and save into split.
273-
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)
274-
275-
if split.length > 0 {
276-
// Add the current split to the datapoints list
277-
datapoints = append(datapoints, dataPoint{
278-
name: dps.metricName,
279-
value: split.cWMetricHistogram,
280-
labels: createLabels(metric.Attributes(), instrumentationScopeName),
281-
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
282-
})
283-
}
284-
}
285-
286-
if len(datapoints) == 0 {
287-
return []dataPoint{{
288-
name: dps.metricName,
289-
value: &cWMetricHistogram{
290-
Values: []float64{},
291-
Counts: []float64{},
292-
Count: metric.Count(),
293-
Sum: metric.Sum(),
294-
Max: metric.Max(),
295-
Min: metric.Min(),
296-
},
297-
labels: createLabels(metric.Attributes(), instrumentationScopeName),
298-
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
299-
}}, true
300-
}
301-
302-
// Override the min and max values of the first and last splits with the raw data of the metric.
303-
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
304-
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()
305-
306-
return datapoints, true
307-
}
308-
309-
func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
310-
if split.isFull() || currentPositiveIndex < 0 {
311-
return currentBucketIndex, currentPositiveIndex
312-
}
313-
314199
scale := metric.Scale()
315200
base := math.Pow(2, math.Pow(2, float64(-scale)))
201+
arrayValues := []float64{}
202+
arrayCounts := []float64{}
203+
var bucketBegin float64
204+
var bucketEnd float64
205+
206+
// Set mid-point of positive buckets in values/counts array.
316207
positiveBuckets := metric.Positive()
317208
positiveOffset := positiveBuckets.Offset()
318209
positiveBucketCounts := positiveBuckets.BucketCounts()
319-
bucketBegin := 0.0
320-
bucketEnd := 0.0
321-
322-
for !split.isFull() && currentPositiveIndex >= 0 {
323-
index := currentPositiveIndex + int(positiveOffset)
324-
if bucketEnd == 0 {
325-
bucketEnd = math.Pow(base, float64(index+1))
210+
bucketBegin = 0
211+
bucketEnd = 0
212+
for i := 0; i < positiveBucketCounts.Len(); i++ {
213+
index := i + int(positiveOffset)
214+
if bucketBegin == 0 {
215+
bucketBegin = math.Pow(base, float64(index))
326216
} else {
327-
bucketEnd = bucketBegin
217+
bucketBegin = bucketEnd
328218
}
329-
bucketBegin = math.Pow(base, float64(index))
219+
bucketEnd = math.Pow(base, float64(index+1))
330220
metricVal := (bucketBegin + bucketEnd) / 2
331-
count := positiveBucketCounts.At(currentPositiveIndex)
221+
count := positiveBucketCounts.At(i)
332222
if count > 0 {
333-
split.appendMetricData(metricVal, count)
334-
335-
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
336-
if split.length == 1 {
337-
split.setMax(bucketEnd)
338-
}
339-
if split.isFull() {
340-
split.setMin(bucketBegin)
341-
}
223+
arrayValues = append(arrayValues, metricVal)
224+
arrayCounts = append(arrayCounts, float64(count))
342225
}
343-
currentBucketIndex++
344-
currentPositiveIndex--
345226
}
346227

347-
return currentBucketIndex, currentPositiveIndex
348-
}
349-
350-
func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
351-
if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 {
352-
split.appendMetricData(0, metric.ZeroCount())
353-
354-
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
355-
if split.length == 1 {
356-
split.setMax(0)
357-
}
358-
if split.isFull() {
359-
split.setMin(0)
360-
}
361-
currentZeroIndex++
362-
currentBucketIndex++
228+
// Set count of zero bucket in values/counts array.
229+
if metric.ZeroCount() > 0 {
230+
arrayValues = append(arrayValues, 0)
231+
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
363232
}
364233

365-
return currentBucketIndex, currentZeroIndex
366-
}
367-
368-
func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
234+
// Set mid-point of negative buckets in values/counts array.
369235
// According to metrics spec, the value in histogram is expected to be non-negative.
370236
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
371237
// However, the negative support is defined in metrics data model.
372238
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
373239
// The negative is also supported but only verified with unit test.
374-
if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
375-
return currentBucketIndex, currentNegativeIndex
376-
}
377240

378-
scale := metric.Scale()
379-
base := math.Pow(2, math.Pow(2, float64(-scale)))
380241
negativeBuckets := metric.Negative()
381242
negativeOffset := negativeBuckets.Offset()
382243
negativeBucketCounts := negativeBuckets.BucketCounts()
383-
bucketBegin := 0.0
384-
bucketEnd := 0.0
385-
386-
for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
387-
index := currentNegativeIndex + int(negativeOffset)
244+
bucketBegin = 0
245+
bucketEnd = 0
246+
for i := 0; i < negativeBucketCounts.Len(); i++ {
247+
index := i + int(negativeOffset)
388248
if bucketEnd == 0 {
389249
bucketEnd = -math.Pow(base, float64(index))
390250
} else {
391251
bucketEnd = bucketBegin
392252
}
393253
bucketBegin = -math.Pow(base, float64(index+1))
394254
metricVal := (bucketBegin + bucketEnd) / 2
395-
count := negativeBucketCounts.At(currentNegativeIndex)
255+
count := negativeBucketCounts.At(i)
396256
if count > 0 {
397-
split.appendMetricData(metricVal, count)
398-
399-
// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
400-
if split.length == 1 {
401-
split.setMax(bucketEnd)
402-
}
403-
if split.isFull() {
404-
split.setMin(bucketBegin)
405-
}
257+
arrayValues = append(arrayValues, metricVal)
258+
arrayCounts = append(arrayCounts, float64(count))
406259
}
407-
currentBucketIndex++
408-
currentNegativeIndex++
409260
}
410261

411-
return currentBucketIndex, currentNegativeIndex
262+
return []dataPoint{{
263+
name: dps.metricName,
264+
value: &cWMetricHistogram{
265+
Values: arrayValues,
266+
Counts: arrayCounts,
267+
Count: metric.Count(),
268+
Sum: metric.Sum(),
269+
Max: metric.Max(),
270+
Min: metric.Min(),
271+
},
272+
labels: createLabels(metric.Attributes(), instrumentationScopeName),
273+
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
274+
}}, true
412275
}
413276

414277
func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {

0 commit comments

Comments
 (0)