Skip to content

[exporter/doris] Optimize table schema #38229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/doris-optimize-schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "optimize schema: add 'service_instance_id', add materialized views, add trace_graph table and trace_graph job."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38229]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 16 additions & 0 deletions exporter/dorisexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
//go:embed sql/logs_ddl.sql
var logsDDL string

//go:embed sql/logs_view.sql
var logsView string

// dLog Log to Doris
type dLog struct {
ServiceName string `json:"service_name"`
Timestamp string `json:"timestamp"`
ServiceInstanceID string `json:"service_instance_id"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
SeverityNumber int32 `json:"severity_number"`
Expand Down Expand Up @@ -70,6 +74,12 @@ func (e *logsExporter) start(ctx context.Context, host component.Host) error {
if err != nil {
return err
}

view := fmt.Sprintf(logsView, e.cfg.Table.Logs, e.cfg.Table.Logs)
_, err = conn.ExecContext(ctx, view)
if err != nil {
e.logger.Warn("failed to create materialized view", zap.Error(err))
}
}

go e.reporter.report()
Expand All @@ -96,6 +106,11 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
if ok {
serviceName = v.AsString()
}
serviceInstance := ""
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
if ok {
serviceInstance = v.AsString()
}

for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
scopeLogs := resourceLogs.ScopeLogs().At(j)
Expand All @@ -106,6 +121,7 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
log := &dLog{
ServiceName: serviceName,
Timestamp: e.formatTime(logRecord.Timestamp().AsTime()),
ServiceInstanceID: serviceInstance,
TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()),
SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()),
SeverityNumber: int32(logRecord.SeverityNumber()),
Expand Down
27 changes: 27 additions & 0 deletions exporter/dorisexporter/exporter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
_ "embed" // for SQL file embedding
"encoding/json"
"errors"
"fmt"
Expand All @@ -25,6 +26,9 @@ var ddls = []string{
metricsSummaryDDL,
}

//go:embed sql/metrics_view.sql
var metricsView string

type metricsExporter struct {
*commonExporter
}
Expand Down Expand Up @@ -61,6 +65,23 @@ func (e *metricsExporter) start(ctx context.Context, host component.Host) error
return err
}
}

models := []metricModel{
&metricModelGauge{},
&metricModelSum{},
&metricModelHistogram{},
&metricModelExponentialHistogram{},
&metricModelSummary{},
}

for _, model := range models {
table := e.cfg.Table.Metrics + model.tableSuffix()
view := fmt.Sprintf(metricsView, table, table)
_, err = conn.ExecContext(ctx, view)
if err != nil {
e.logger.Warn("failed to create materialized view", zap.Error(err))
}
}
}

go e.reporter.report()
Expand Down Expand Up @@ -158,6 +179,11 @@ func (e *metricsExporter) pushMetricData(ctx context.Context, md pmetric.Metrics
if ok {
serviceName = v.AsString()
}
serviceInstance := ""
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
if ok {
serviceInstance = v.AsString()
}

for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)
Expand All @@ -167,6 +193,7 @@ func (e *metricsExporter) pushMetricData(ctx context.Context, md pmetric.Metrics

dm := &dMetric{
ServiceName: serviceName,
ServiceInstanceID: serviceInstance,
MetricName: metric.Name(),
MetricDescription: metric.Description(),
MetricUnit: metric.Unit(),
Expand Down
61 changes: 60 additions & 1 deletion exporter/dorisexporter/exporter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@ import (
//go:embed sql/traces_ddl.sql
var tracesDDL string

//go:embed sql/traces_view.sql
var tracesView string

//go:embed sql/traces_graph_ddl.sql
var tracesGraphDDL string

//go:embed sql/traces_graph_job.sql
var tracesGraphJob string

// dTrace Trace to Doris
type dTrace struct {
ServiceName string `json:"service_name"`
Timestamp string `json:"timestamp"`
ServiceInstanceID string `json:"service_instance_id"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
TraceState string `json:"trace_state"`
Expand Down Expand Up @@ -75,7 +85,7 @@ func (e *tracesExporter) start(ctx context.Context, host component.Host) error {
}
e.client = client

if !e.cfg.CreateSchema {
if e.cfg.CreateSchema {
conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
Expand All @@ -92,6 +102,30 @@ func (e *tracesExporter) start(ctx context.Context, host component.Host) error {
if err != nil {
return err
}

view := fmt.Sprintf(tracesView, e.cfg.Table.Traces, e.cfg.Table.Traces)
_, err = conn.ExecContext(ctx, view)
if err != nil {
e.logger.Warn("failed to create materialized view", zap.Error(err))
}

ddl = fmt.Sprintf(tracesGraphDDL, e.cfg.Table.Traces, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
if err != nil {
return err
}

dropJob := e.formatDropTraceGraphJob()
_, err = conn.ExecContext(ctx, dropJob)
if err != nil {
e.logger.Warn("failed to drop job", zap.Error(err))
}

job := e.formatTraceGraphJob()
_, err = conn.ExecContext(ctx, job)
if err != nil {
e.logger.Warn("failed to create job", zap.Error(err))
}
}

go e.reporter.report()
Expand All @@ -118,6 +152,11 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
if ok {
serviceName = v.AsString()
}
serviceInstance := ""
v, ok = resourceAttributes.Get(semconv.AttributeServiceInstanceID)
if ok {
serviceInstance = v.AsString()
}

for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)
Expand Down Expand Up @@ -157,6 +196,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
trace := &dTrace{
ServiceName: serviceName,
Timestamp: e.formatTime(span.StartTimestamp().AsTime()),
ServiceInstanceID: serviceInstance,
TraceID: traceutil.TraceIDToHexOrEmptyString(span.TraceID()),
SpanID: traceutil.SpanIDToHexOrEmptyString(span.SpanID()),
TraceState: span.TraceState().AsRaw(),
Expand Down Expand Up @@ -229,3 +269,22 @@ func (e *tracesExporter) pushTraceDataInternal(ctx context.Context, traces []*dT

return fmt.Errorf("failed to push trace data, response:%s", string(body))
}

func (e *tracesExporter) formatDropTraceGraphJob() string {
return fmt.Sprintf(
"DROP JOB where jobName = '%s:%s_graph_job';",
e.cfg.Database,
e.cfg.Table.Traces,
)
}

func (e *tracesExporter) formatTraceGraphJob() string {
return fmt.Sprintf(
tracesGraphJob,
e.cfg.Database,
e.cfg.Table.Traces,
e.cfg.Table.Traces,
e.cfg.Table.Traces,
e.cfg.Table.Traces,
)
}
1 change: 1 addition & 0 deletions exporter/dorisexporter/metrics_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (m *metricModelCommon[T]) label() string {
// dMetric Basic Metric
type dMetric struct {
ServiceName string `json:"service_name"`
ServiceInstanceID string `json:"service_instance_id"`
MetricName string `json:"metric_name"`
MetricDescription string `json:"metric_description"`
MetricUnit string `json:"metric_unit"`
Expand Down
10 changes: 6 additions & 4 deletions exporter/dorisexporter/sql/logs_ddl.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
CREATE TABLE IF NOT EXISTS %s
(
service_name VARCHAR(200),
timestamp DATETIME(6),
service_name VARCHAR(200),
service_instance_id VARCHAR(200),
trace_id VARCHAR(200),
span_id STRING,
severity_number INT,
Expand All @@ -13,6 +14,7 @@ CREATE TABLE IF NOT EXISTS %s
scope_version STRING,
INDEX idx_service_name(service_name) USING INVERTED,
INDEX idx_timestamp(timestamp) USING INVERTED,
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
INDEX idx_trace_id(trace_id) USING INVERTED,
INDEX idx_span_id(span_id) USING INVERTED,
INDEX idx_severity_number(severity_number) USING INVERTED,
Expand All @@ -24,7 +26,7 @@ CREATE TABLE IF NOT EXISTS %s
INDEX idx_scope_version(scope_version) USING INVERTED
)
ENGINE = OLAP
DUPLICATE KEY(service_name, timestamp)
DUPLICATE KEY(timestamp, service_name)
PARTITION BY RANGE(timestamp) ()
DISTRIBUTED BY HASH(trace_id) BUCKETS AUTO
%s;
DISTRIBUTED BY RANDOM BUCKETS AUTO
%s;
4 changes: 4 additions & 0 deletions exporter/dorisexporter/sql/logs_view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE MATERIALIZED VIEW %s_services AS
SELECT service_name, service_instance_id
FROM %s
GROUP BY service_name, service_instance_id;
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
(
service_name VARCHAR(200),
timestamp DATETIME(6),
service_instance_id VARCHAR(200),
metric_name VARCHAR(200),
metric_description STRING,
metric_unit STRING,
Expand All @@ -25,6 +26,7 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
scope_version STRING,
INDEX idx_service_name(service_name) USING INVERTED,
INDEX idx_timestamp(timestamp) USING INVERTED,
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
INDEX idx_metric_name(metric_name) USING INVERTED,
INDEX idx_metric_description(metric_description) USING INVERTED,
INDEX idx_metric_unit(metric_unit) USING INVERTED,
Expand All @@ -43,5 +45,5 @@ CREATE TABLE IF NOT EXISTS %s_exponential_histogram
ENGINE = OLAP
DUPLICATE KEY(service_name, timestamp)
PARTITION BY RANGE(timestamp) ()
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
DISTRIBUTED BY RANDOM BUCKETS AUTO
%s;
4 changes: 3 additions & 1 deletion exporter/dorisexporter/sql/metrics_gauge_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_gauge
(
service_name VARCHAR(200),
timestamp DATETIME(6),
service_instance_id VARCHAR(200),
metric_name VARCHAR(200),
metric_description STRING,
metric_unit STRING,
Expand All @@ -14,6 +15,7 @@ CREATE TABLE IF NOT EXISTS %s_gauge
scope_version STRING,
INDEX idx_service_name(service_name) USING INVERTED,
INDEX idx_timestamp(timestamp) USING INVERTED,
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
INDEX idx_metric_name(metric_name) USING INVERTED,
INDEX idx_metric_description(metric_description) USING INVERTED,
INDEX idx_metric_unit(metric_unit) USING INVERTED,
Expand All @@ -26,5 +28,5 @@ CREATE TABLE IF NOT EXISTS %s_gauge
ENGINE = OLAP
DUPLICATE KEY(service_name, timestamp)
PARTITION BY RANGE(timestamp) ()
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
DISTRIBUTED BY RANDOM BUCKETS AUTO
%s;
4 changes: 3 additions & 1 deletion exporter/dorisexporter/sql/metrics_histogram_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_histogram
(
service_name VARCHAR(200),
timestamp DATETIME(6),
service_instance_id VARCHAR(200),
metric_name VARCHAR(200),
metric_description STRING,
metric_unit STRING,
Expand All @@ -20,6 +21,7 @@ CREATE TABLE IF NOT EXISTS %s_histogram
scope_version STRING,
INDEX idx_service_name(service_name) USING INVERTED,
INDEX idx_timestamp(timestamp) USING INVERTED,
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
INDEX idx_metric_name(metric_name) USING INVERTED,
INDEX idx_metric_description(metric_description) USING INVERTED,
INDEX idx_metric_unit(metric_unit) USING INVERTED,
Expand All @@ -34,5 +36,5 @@ CREATE TABLE IF NOT EXISTS %s_histogram
ENGINE = OLAP
DUPLICATE KEY(service_name, timestamp)
PARTITION BY RANGE(timestamp) ()
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
DISTRIBUTED BY RANDOM BUCKETS AUTO
%s;
4 changes: 3 additions & 1 deletion exporter/dorisexporter/sql/metrics_sum_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS %s_sum
(
service_name VARCHAR(200),
timestamp DATETIME(6),
service_instance_id VARCHAR(200),
metric_name VARCHAR(200),
metric_description STRING,
metric_unit STRING,
Expand All @@ -16,6 +17,7 @@ CREATE TABLE IF NOT EXISTS %s_sum
scope_version STRING,
INDEX idx_service_name(service_name) USING INVERTED,
INDEX idx_timestamp(timestamp) USING INVERTED,
INDEX idx_service_instance_id(service_instance_id) USING INVERTED,
INDEX idx_metric_name(metric_name) USING INVERTED,
INDEX idx_metric_description(metric_description) USING INVERTED,
INDEX idx_metric_unit(metric_unit) USING INVERTED,
Expand All @@ -29,5 +31,5 @@ CREATE TABLE IF NOT EXISTS %s_sum
ENGINE = OLAP
DUPLICATE KEY(service_name, timestamp)
PARTITION BY RANGE(timestamp) ()
DISTRIBUTED BY HASH(metric_name) BUCKETS AUTO
DISTRIBUTED BY RANDOM BUCKETS AUTO
%s;
Loading