Skip to content

Commit 230ed5c

Browse files
jmichalek132ArthurSensdashpole
authored
feat: added prometheus rw2 translation for gauges (#35734)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Follow up from #35703. Draft starting the work on adding support for remote write 2.0 in the translation package. Adding support for translating gauges. This is first iteration and to keep the PR small * we don't handle duplicate metrics * only support gauges * don't handle other labels than metric name * don't handle exemplars * don't handle metadata <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue #33661 Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Juraj Michalek <[email protected]> Co-authored-by: Arthur Silva Sens <[email protected]> Co-authored-by: David Ashpole <[email protected]>
1 parent 686721c commit 230ed5c

File tree

7 files changed

+405
-10
lines changed

7 files changed

+405
-10
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/translator/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: add FromMetricsV2
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33661]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: The public function is partially implemented and not ready for use
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

pkg/translator/prometheusremotewrite/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.22.0
44

55
require (
66
github.com/cespare/xxhash/v2 v2.3.0
7+
github.com/google/go-cmp v0.6.0
78
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
89
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.111.0
910
github.com/prometheus/common v0.60.0

pkg/translator/prometheusremotewrite/metrics_to_prw_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func BenchmarkFromMetrics(b *testing.B) {
3434
b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) {
3535
for _, exemplarsPerSeries := range []int{0, 5, 10} {
3636
b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) {
37-
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries)
37+
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano())))
3838

3939
for i := 0; i < b.N; i++ {
4040
tsMap, err := FromMetrics(payload.Metrics(), Settings{})
@@ -71,7 +71,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
7171
b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) {
7272
for _, exemplarsPerSeries := range []int{0, 5, 10} {
7373
b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) {
74-
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries)
74+
payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano())))
7575

7676
for i := 0; i < b.N; i++ {
7777
converter := newPrometheusConverter()
@@ -90,22 +90,21 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) {
9090
}
9191
}
9292

93-
func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest {
93+
func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int, timestamp pcommon.Timestamp) pmetricotlp.ExportRequest {
9494
request := pmetricotlp.NewExportRequest()
9595

9696
rm := request.Metrics().ResourceMetrics().AppendEmpty()
9797
generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount)
9898

9999
metrics := rm.ScopeMetrics().AppendEmpty().Metrics()
100-
ts := pcommon.NewTimestampFromTime(time.Now())
101100

102101
for i := 1; i <= histogramCount; i++ {
103102
m := metrics.AppendEmpty()
104103
m.SetEmptyHistogram()
105104
m.SetName(fmt.Sprintf("histogram-%v", i))
106105
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
107106
h := m.Histogram().DataPoints().AppendEmpty()
108-
h.SetTimestamp(ts)
107+
h.SetTimestamp(timestamp)
109108

110109
// Set 50 samples, 10 each with values 0.5, 1, 2, 4, and 8
111110
h.SetCount(50)
@@ -114,7 +113,7 @@ func createExportRequest(resourceAttributeCount int, histogramCount int, nonHist
114113
h.ExplicitBounds().FromRaw([]float64{.5, 1, 2, 4, 8, 16}) // Bucket boundaries include the upper limit (ie. each sample is on the upper limit of its bucket)
115114

116115
generateAttributes(h.Attributes(), "series", labelsPerMetric)
117-
generateExemplars(h.Exemplars(), exemplarsPerSeries, ts)
116+
generateExemplars(h.Exemplars(), exemplarsPerSeries, timestamp)
118117
}
119118

120119
for i := 1; i <= nonHistogramCount; i++ {
@@ -123,21 +122,21 @@ func createExportRequest(resourceAttributeCount int, histogramCount int, nonHist
123122
m.SetName(fmt.Sprintf("sum-%v", i))
124123
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
125124
point := m.Sum().DataPoints().AppendEmpty()
126-
point.SetTimestamp(ts)
125+
point.SetTimestamp(timestamp)
127126
point.SetDoubleValue(1.23)
128127
generateAttributes(point.Attributes(), "series", labelsPerMetric)
129-
generateExemplars(point.Exemplars(), exemplarsPerSeries, ts)
128+
generateExemplars(point.Exemplars(), exemplarsPerSeries, timestamp)
130129
}
131130

132131
for i := 1; i <= nonHistogramCount; i++ {
133132
m := metrics.AppendEmpty()
134133
m.SetEmptyGauge()
135134
m.SetName(fmt.Sprintf("gauge-%v", i))
136135
point := m.Gauge().DataPoints().AppendEmpty()
137-
point.SetTimestamp(ts)
136+
point.SetTimestamp(timestamp)
138137
point.SetDoubleValue(1.23)
139138
generateAttributes(point.Attributes(), "series", labelsPerMetric)
140-
generateExemplars(point.Exemplars(), exemplarsPerSeries, ts)
139+
generateExemplars(point.Exemplars(), exemplarsPerSeries, timestamp)
141140
}
142141

143142
return request
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"strconv"
10+
11+
"github.com/prometheus/prometheus/model/labels"
12+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
"go.uber.org/multierr"
16+
17+
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
18+
)
19+
20+
// FromMetricsV2 converts pmetric.Metrics to Prometheus remote write format 2.0.
21+
func FromMetricsV2(md pmetric.Metrics, settings Settings) (map[string]*writev2.TimeSeries, writev2.SymbolsTable, error) {
22+
c := newPrometheusConverterV2()
23+
errs := c.fromMetrics(md, settings)
24+
tss := c.timeSeries()
25+
out := make(map[string]*writev2.TimeSeries, len(tss))
26+
for i := range tss {
27+
out[strconv.Itoa(i)] = &tss[i]
28+
}
29+
30+
return out, c.symbolTable, errs
31+
}
32+
33+
// prometheusConverterV2 converts from OTLP to Prometheus write 2.0 format.
34+
type prometheusConverterV2 struct {
35+
// TODO handle conflicts
36+
unique map[uint64]*writev2.TimeSeries
37+
symbolTable writev2.SymbolsTable
38+
}
39+
40+
func newPrometheusConverterV2() *prometheusConverterV2 {
41+
return &prometheusConverterV2{
42+
unique: map[uint64]*writev2.TimeSeries{},
43+
symbolTable: writev2.NewSymbolTable(),
44+
}
45+
}
46+
47+
// fromMetrics converts pmetric.Metrics to Prometheus remote write format.
48+
func (c *prometheusConverterV2) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
49+
resourceMetricsSlice := md.ResourceMetrics()
50+
for i := 0; i < resourceMetricsSlice.Len(); i++ {
51+
resourceMetrics := resourceMetricsSlice.At(i)
52+
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
53+
// keep track of the most recent timestamp in the ResourceMetrics for
54+
// use with the "target" info metric
55+
var mostRecentTimestamp pcommon.Timestamp
56+
for j := 0; j < scopeMetricsSlice.Len(); j++ {
57+
metricSlice := scopeMetricsSlice.At(j).Metrics()
58+
59+
// TODO: decide if instrumentation library information should be exported as labels
60+
for k := 0; k < metricSlice.Len(); k++ {
61+
metric := metricSlice.At(k)
62+
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
63+
64+
if !isValidAggregationTemporality(metric) {
65+
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
66+
continue
67+
}
68+
69+
promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes)
70+
71+
// handle individual metrics based on type
72+
//exhaustive:enforce
73+
switch metric.Type() {
74+
case pmetric.MetricTypeGauge:
75+
dataPoints := metric.Gauge().DataPoints()
76+
if dataPoints.Len() == 0 {
77+
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
78+
break
79+
}
80+
c.addGaugeNumberDataPoints(dataPoints, promName)
81+
case pmetric.MetricTypeSum:
82+
// TODO implement
83+
case pmetric.MetricTypeHistogram:
84+
// TODO implement
85+
case pmetric.MetricTypeExponentialHistogram:
86+
// TODO implement
87+
case pmetric.MetricTypeSummary:
88+
// TODO implement
89+
default:
90+
errs = multierr.Append(errs, errors.New("unsupported metric type"))
91+
}
92+
}
93+
}
94+
// TODO implement
95+
// addResourceTargetInfov2(resource, settings, mostRecentTimestamp, c)
96+
}
97+
98+
return
99+
}
100+
101+
// timeSeries returns a slice of the writev2.TimeSeries that were converted from OTel format.
102+
func (c *prometheusConverterV2) timeSeries() []writev2.TimeSeries {
103+
allTS := make([]writev2.TimeSeries, 0, len(c.unique))
104+
for _, ts := range c.unique {
105+
allTS = append(allTS, *ts)
106+
}
107+
return allTS
108+
}
109+
110+
func (c *prometheusConverterV2) addSample(sample *writev2.Sample, lbls labels.Labels) *writev2.TimeSeries {
111+
if sample == nil || len(lbls) == 0 {
112+
// This shouldn't happen
113+
return nil
114+
}
115+
ts := &writev2.TimeSeries{}
116+
ts.LabelsRefs = c.symbolTable.SymbolizeLabels(lbls, ts.LabelsRefs)
117+
ts.Samples = append(ts.Samples, *sample)
118+
119+
c.unique[lbls.Hash()] = ts
120+
121+
return ts
122+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package prometheusremotewrite
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
)
14+
15+
func TestFromMetricsV2(t *testing.T) {
16+
settings := Settings{
17+
Namespace: "",
18+
ExternalLabels: nil,
19+
DisableTargetInfo: false,
20+
ExportCreatedMetric: false,
21+
AddMetricSuffixes: false,
22+
SendMetadata: false,
23+
}
24+
25+
ts := uint64(time.Now().UnixNano())
26+
payload := createExportRequest(5, 0, 1, 3, 0, pcommon.Timestamp(ts))
27+
want := func() map[string]*writev2.TimeSeries {
28+
return map[string]*writev2.TimeSeries{
29+
"0": {
30+
LabelsRefs: []uint32{1, 2},
31+
Samples: []writev2.Sample{
32+
{Timestamp: convertTimeStamp(pcommon.Timestamp(ts)), Value: 1.23},
33+
},
34+
},
35+
}
36+
}
37+
tsMap, symbolsTable, err := FromMetricsV2(payload.Metrics(), settings)
38+
wanted := want()
39+
require.NoError(t, err)
40+
require.NotNil(t, tsMap)
41+
require.Equal(t, wanted, tsMap)
42+
require.NotNil(t, symbolsTable)
43+
44+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
5+
6+
import (
7+
"math"
8+
9+
"github.com/prometheus/common/model"
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/model/value"
12+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
13+
"go.opentelemetry.io/collector/pdata/pmetric"
14+
)
15+
16+
func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, name string) {
17+
for x := 0; x < dataPoints.Len(); x++ {
18+
pt := dataPoints.At(x)
19+
// TODO implement support for labels
20+
21+
labels := labels.Labels{
22+
labels.Label{
23+
Name: model.MetricNameLabel,
24+
Value: name,
25+
},
26+
}
27+
28+
sample := &writev2.Sample{
29+
// convert ns to ms
30+
Timestamp: convertTimeStamp(pt.Timestamp()),
31+
}
32+
switch pt.ValueType() {
33+
case pmetric.NumberDataPointValueTypeInt:
34+
sample.Value = float64(pt.IntValue())
35+
case pmetric.NumberDataPointValueTypeDouble:
36+
sample.Value = pt.DoubleValue()
37+
}
38+
if pt.Flags().NoRecordedValue() {
39+
sample.Value = math.Float64frombits(value.StaleNaN)
40+
}
41+
c.addSample(sample, labels)
42+
}
43+
}

0 commit comments

Comments
 (0)