Skip to content

Commit 0285984

Browse files
authored
[exporter/doris] Optimize table schema (#38229)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description 1. add `service_instance_id` to the tables 2. use random bucket to avoid data skew 3. add materialized views to speed up queries 4. add a trace graph table and use a timed task to import data **The new version of exporter can still import data into the tables of the old version of schema**. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent d9008c1 commit 0285984

17 files changed

+210
-13
lines changed

.chloggen/doris-optimize-schema.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: dorisexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "optimize schema: add 'service_instance_id', add materialized views, add trace_graph table and trace_graph job."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38229]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/dorisexporter/exporter_logs.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ import (
2121
//go:embed sql/logs_ddl.sql
2222
var logsDDL string
2323

24+
//go:embed sql/logs_view.sql
25+
var logsView string
26+
2427
// dLog Log to Doris
2528
type dLog struct {
2629
ServiceName string `json:"service_name"`
2730
Timestamp string `json:"timestamp"`
31+
ServiceInstanceID string `json:"service_instance_id"`
2832
TraceID string `json:"trace_id"`
2933
SpanID string `json:"span_id"`
3034
SeverityNumber int32 `json:"severity_number"`
@@ -70,6 +74,12 @@ func (e *logsExporter) start(ctx context.Context, host component.Host) error {
7074
if err != nil {
7175
return err
7276
}
77+
78+
view := fmt.Sprintf(logsView, e.cfg.Table.Logs, e.cfg.Table.Logs)
79+
_, err = conn.ExecContext(ctx, view)
80+
if err != nil {
81+
e.logger.Warn("failed to create materialized view", zap.Error(err))
82+
}
7383
}
7484

7585
go e.reporter.report()
@@ -96,6 +106,11 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
96106
if ok {
97107
serviceName = v.AsString()
98108
}
109+
serviceInstance := ""
110+
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
111+
if ok {
112+
serviceInstance = v.AsString()
113+
}
99114

100115
for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
101116
scopeLogs := resourceLogs.ScopeLogs().At(j)
@@ -106,6 +121,7 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
106121
log := &dLog{
107122
ServiceName: serviceName,
108123
Timestamp: e.formatTime(logRecord.Timestamp().AsTime()),
124+
ServiceInstanceID: serviceInstance,
109125
TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()),
110126
SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()),
111127
SeverityNumber: int32(logRecord.SeverityNumber()),

exporter/dorisexporter/exporter_metrics.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect
55

66
import (
77
"context"
8+
_ "embed" // for SQL file embedding
89
"encoding/json"
910
"errors"
1011
"fmt"
@@ -25,6 +26,9 @@ var ddls = []string{
2526
metricsSummaryDDL,
2627
}
2728

29+
//go:embed sql/metrics_view.sql
30+
var metricsView string
31+
2832
type metricsExporter struct {
2933
*commonExporter
3034
}
@@ -61,6 +65,23 @@ func (e *metricsExporter) start(ctx context.Context, host component.Host) error
6165
return err
6266
}
6367
}
68+
69+
models := []metricModel{
70+
&metricModelGauge{},
71+
&metricModelSum{},
72+
&metricModelHistogram{},
73+
&metricModelExponentialHistogram{},
74+
&metricModelSummary{},
75+
}
76+
77+
for _, model := range models {
78+
table := e.cfg.Table.Metrics + model.tableSuffix()
79+
view := fmt.Sprintf(metricsView, table, table)
80+
_, err = conn.ExecContext(ctx, view)
81+
if err != nil {
82+
e.logger.Warn("failed to create materialized view", zap.Error(err))
83+
}
84+
}
6485
}
6586

6687
go e.reporter.report()
@@ -158,6 +179,11 @@ func (e *metricsExporter) pushMetricData(ctx context.Context, md pmetric.Metrics
158179
if ok {
159180
serviceName = v.AsString()
160181
}
182+
serviceInstance := ""
183+
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
184+
if ok {
185+
serviceInstance = v.AsString()
186+
}
161187

162188
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
163189
scopeMetric := resourceMetric.ScopeMetrics().At(j)
@@ -167,6 +193,7 @@ func (e *metricsExporter) pushMetricData(ctx context.Context, md pmetric.Metrics
167193

168194
dm := &dMetric{
169195
ServiceName: serviceName,
196+
ServiceInstanceID: serviceInstance,
170197
MetricName: metric.Name(),
171198
MetricDescription: metric.Description(),
172199
MetricUnit: metric.Unit(),

exporter/dorisexporter/exporter_traces.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,20 @@ import (
2121
//go:embed sql/traces_ddl.sql
2222
var tracesDDL string
2323

24+
//go:embed sql/traces_view.sql
25+
var tracesView string
26+
27+
//go:embed sql/traces_graph_ddl.sql
28+
var tracesGraphDDL string
29+
30+
//go:embed sql/traces_graph_job.sql
31+
var tracesGraphJob string
32+
2433
// dTrace Trace to Doris
2534
type dTrace struct {
2635
ServiceName string `json:"service_name"`
2736
Timestamp string `json:"timestamp"`
37+
ServiceInstanceID string `json:"service_instance_id"`
2838
TraceID string `json:"trace_id"`
2939
SpanID string `json:"span_id"`
3040
TraceState string `json:"trace_state"`
@@ -75,7 +85,7 @@ func (e *tracesExporter) start(ctx context.Context, host component.Host) error {
7585
}
7686
e.client = client
7787

78-
if !e.cfg.CreateSchema {
88+
if e.cfg.CreateSchema {
7989
conn, err := createDorisMySQLClient(e.cfg)
8090
if err != nil {
8191
return err
@@ -92,6 +102,30 @@ func (e *tracesExporter) start(ctx context.Context, host component.Host) error {
92102
if err != nil {
93103
return err
94104
}
105+
106+
view := fmt.Sprintf(tracesView, e.cfg.Table.Traces, e.cfg.Table.Traces)
107+
_, err = conn.ExecContext(ctx, view)
108+
if err != nil {
109+
e.logger.Warn("failed to create materialized view", zap.Error(err))
110+
}
111+
112+
ddl = fmt.Sprintf(tracesGraphDDL, e.cfg.Table.Traces, e.cfg.propertiesStr())
113+
_, err = conn.ExecContext(ctx, ddl)
114+
if err != nil {
115+
return err
116+
}
117+
118+
dropJob := e.formatDropTraceGraphJob()
119+
_, err = conn.ExecContext(ctx, dropJob)
120+
if err != nil {
121+
e.logger.Warn("failed to drop job", zap.Error(err))
122+
}
123+
124+
job := e.formatTraceGraphJob()
125+
_, err = conn.ExecContext(ctx, job)
126+
if err != nil {
127+
e.logger.Warn("failed to create job", zap.Error(err))
128+
}
95129
}
96130

97131
go e.reporter.report()
@@ -118,6 +152,11 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
118152
if ok {
119153
serviceName = v.AsString()
120154
}
155+
serviceInstance := ""
156+
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
157+
if ok {
158+
serviceInstance = v.AsString()
159+
}
121160

122161
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
123162
scopeSpan := resourceSpan.ScopeSpans().At(j)
@@ -157,6 +196,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
157196
trace := &dTrace{
158197
ServiceName: serviceName,
159198
Timestamp: e.formatTime(span.StartTimestamp().AsTime()),
199+
ServiceInstanceID: serviceInstance,
160200
TraceID: traceutil.TraceIDToHexOrEmptyString(span.TraceID()),
161201
SpanID: traceutil.SpanIDToHexOrEmptyString(span.SpanID()),
162202
TraceState: span.TraceState().AsRaw(),
@@ -229,3 +269,22 @@ func (e *tracesExporter) pushTraceDataInternal(ctx context.Context, traces []*dT
229269

230270
return fmt.Errorf("failed to push trace data, response:%s", string(body))
231271
}
272+
273+
func (e *tracesExporter) formatDropTraceGraphJob() string {
274+
return fmt.Sprintf(
275+
"DROP JOB where jobName = '%s:%s_graph_job';",
276+
e.cfg.Database,
277+
e.cfg.Table.Traces,
278+
)
279+
}
280+
281+
func (e *tracesExporter) formatTraceGraphJob() string {
282+
return fmt.Sprintf(
283+
tracesGraphJob,
284+
e.cfg.Database,
285+
e.cfg.Table.Traces,
286+
e.cfg.Table.Traces,
287+
e.cfg.Table.Traces,
288+
e.cfg.Table.Traces,
289+
)
290+
}

exporter/dorisexporter/metrics_model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func (m *metricModelCommon[T]) label() string {
3434
// dMetric Basic Metric
3535
type dMetric struct {
3636
ServiceName string `json:"service_name"`
37+
ServiceInstanceID string `json:"service_instance_id"`
3738
MetricName string `json:"metric_name"`
3839
MetricDescription string `json:"metric_description"`
3940
MetricUnit string `json:"metric_unit"`

exporter/dorisexporter/sql/logs_ddl.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
CREATE TABLE IF NOT EXISTS %s
22
(
3-
service_name VARCHAR(200),
43
timestamp DATETIME(6),
4+
service_name VARCHAR(200),
5+
service_instance_id VARCHAR(200),
56
trace_id VARCHAR(200),
67
span_id STRING,
78
severity_number INT,
@@ -13,6 +14,7 @@ CREATE TABLE IF NOT EXISTS %s
1314
scope_version STRING,
1415
INDEX idx_service_name(service_name) USING INVERTED,
1516
INDEX idx_timestamp(timestamp) USING INVERTED,
17+
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
1618
INDEX idx_trace_id(trace_id) USING INVERTED,
1719
INDEX idx_span_id(span_id) USING INVERTED,
1820
INDEX idx_severity_number(severity_number) USING INVERTED,
@@ -24,7 +26,7 @@ CREATE TABLE IF NOT EXISTS %s
2426
INDEX idx_scope_version(scope_version) USING INVERTED
2527
)
2628
ENGINE = OLAP
27-
DUPLICATE KEY(service_name, timestamp)
29+
DUPLICATE KEY(timestamp, service_name)
2830
PARTITION BY RANGE(timestamp) ()
29-
DISTRIBUTED BY HASH(trace_id) BUCKETS AUTO
30-
%s;
31+
DISTRIBUTED BY RANDOM BUCKETS AUTO
32+
%s;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE MATERIALIZED VIEW %s_services AS
2+
SELECT service_name, service_instance_id
3+
FROM %s
4+
GROUP BY service_name, service_instance_id;

exporter/dorisexporter/sql/metrics_exponential_histogram_ddl.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
22
(
33
service_name VARCHAR(200),
44
timestamp DATETIME(6),
5+
service_instance_id VARCHAR(200),
56
metric_name VARCHAR(200),
67
metric_description STRING,
78
metric_unit STRING,
@@ -25,6 +26,7 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
2526
scope_version STRING,
2627
INDEX idx_service_name(service_name) USING INVERTED,
2728
INDEX idx_timestamp(timestamp) USING INVERTED,
29+
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
2830
INDEX idx_metric_name(metric_name) USING INVERTED,
2931
INDEX idx_metric_description(metric_description) USING INVERTED,
3032
INDEX idx_metric_unit(metric_unit) USING INVERTED,
@@ -43,5 +45,5 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
4345
ENGINE = OLAP
4446
DUPLICATE KEY(service_name, timestamp)
4547
PARTITION BY RANGE(timestamp) ()
46-
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
48+
DISTRIBUTED BY RANDOM BUCKETS AUTO
4749
%s;

exporter/dorisexporter/sql/metrics_gauge_ddl.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_gauge
22
(
33
service_name VARCHAR(200),
44
timestamp DATETIME(6),
5+
service_instance_id VARCHAR(200),
56
metric_name VARCHAR(200),
67
metric_description STRING,
78
metric_unit STRING,
@@ -14,6 +15,7 @@ CREATE TABLE IF NOT EXISTS %s_gauge
1415
scope_version STRING,
1516
INDEX idx_service_name(service_name) USING INVERTED,
1617
INDEX idx_timestamp(timestamp) USING INVERTED,
18+
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
1719
INDEX idx_metric_name(metric_name) USING INVERTED,
1820
INDEX idx_metric_description(metric_description) USING INVERTED,
1921
INDEX idx_metric_unit(metric_unit) USING INVERTED,
@@ -26,5 +28,5 @@ CREATE TABLE IF NOT EXISTS %s_gauge
2628
ENGINE = OLAP
2729
DUPLICATE KEY(service_name, timestamp)
2830
PARTITION BY RANGE(timestamp) ()
29-
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
31+
DISTRIBUTED BY RANDOM BUCKETS AUTO
3032
%s;

exporter/dorisexporter/sql/metrics_histogram_ddl.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_histogram
22
(
33
service_name VARCHAR(200),
44
timestamp DATETIME(6),
5+
service_instance_id VARCHAR(200),
56
metric_name VARCHAR(200),
67
metric_description STRING,
78
metric_unit STRING,
@@ -20,6 +21,7 @@ CREATE TABLE IF NOT EXISTS %s_histogram
2021
scope_version STRING,
2122
INDEX idx_service_name(service_name) USING INVERTED,
2223
INDEX idx_timestamp(timestamp) USING INVERTED,
24+
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
2325
INDEX idx_metric_name(metric_name) USING INVERTED,
2426
INDEX idx_metric_description(metric_description) USING INVERTED,
2527
INDEX idx_metric_unit(metric_unit) USING INVERTED,
@@ -34,5 +36,5 @@ CREATE TABLE IF NOT EXISTS %s_histogram
3436
ENGINE = OLAP
3537
DUPLICATE KEY(service_name, timestamp)
3638
PARTITION BY RANGE(timestamp) ()
37-
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
39+
DISTRIBUTED BY RANDOM BUCKETS AUTO
3840
%s;

exporter/dorisexporter/sql/metrics_sum_ddl.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_sum
22
(
33
service_name VARCHAR(200),
44
timestamp DATETIME(6),
5+
service_instance_id VARCHAR(200),
56
metric_name VARCHAR(200),
67
metric_description STRING,
78
metric_unit STRING,
@@ -16,6 +17,7 @@ CREATE TABLE IF NOT EXISTS %s_sum
1617
scope_version STRING,
1718
INDEX idx_service_name(service_name) USING INVERTED,
1819
INDEX idx_timestamp(timestamp) USING INVERTED,
20+
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
1921
INDEX idx_metric_name(metric_name) USING INVERTED,
2022
INDEX idx_metric_description(metric_description) USING INVERTED,
2123
INDEX idx_metric_unit(metric_unit) USING INVERTED,
@@ -29,5 +31,5 @@ CREATE TABLE IF NOT EXISTS %s_sum
2931
ENGINE = OLAP
3032
DUPLICATE KEY(service_name, timestamp)
3133
PARTITION BY RANGE(timestamp) ()
32-
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
34+
DISTRIBUTED BY RANDOM BUCKETS AUTO
3335
%s;

0 commit comments

Comments
 (0)