Skip to content

Commit 8bcaf19

Browse files
authored
Merge branch 'main' into becky/multi-bearer-support
2 parents b9a0382 + 8d2052d commit 8bcaf19

29 files changed

+435
-221
lines changed

.chloggen/doris-add-config.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: dorisexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "add new config: log_response, label_prefix, headers, log_progress_interval"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38162]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/dorisexporter/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
e2e*

exporter/dorisexporter/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ The following configuration options are supported:
3030
* `history_days` (default = 0) Data older than these days will be deleted; ignored if `create_schema` is false. If set to 0, historical data will not be deleted.
3131
* `create_history_days` (default = 0) The number of days in the history partition that was created when the table was created; ignored if `create_schema` is false. If `history_days` is not 0, `create_history_days` needs to be less than or equal to `history_days`.
3232
* `replication_num` (default = 1) The number of replicas of the table; ignored if `create_schema` is false.
33-
* `timezone` (default is UTC) The time zone of doris.
33+
* `timezone` (default is the time zone of the opentelemetry collector if IANA Time Zone Database is found, else is UTC) The time zone of doris, e.g. Asia/Shanghai.
34+
* `log_response` (default = false) Whether to log the response of doris stream load.
35+
* `label_prefix` (default = open_telemetry) the prefix of the label in doris stream load. The final generated label is {label_prefix}{db}{table}{yyyyMMddHHmmss}{uuid}.
36+
* `headers` (default is empty map) The headers of doris stream load. Details: [header parameters](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual#load-configuration-parameters) and [group commit](https://doris.apache.org/docs/data-operate/import/group-commit-manual#stream-load).
37+
* `log_progress_interval` (default = 10) The interval, in seconds, between statistical logs. When it is less than or equal to 0, the statistical log is not printed.
3438
* `sending_queue` [details here](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration)
3539
* `enabled` (default = true)
3640
* `num_consumers` (default = 10) Number of consumers that dequeue batches; ignored if `enabled` is false.

exporter/dorisexporter/config.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
)
1717

1818
type Config struct {
19+
// confighttp.ClientConfig.Headers is the headers of doris stream load.
1920
confighttp.ClientConfig `mapstructure:",squash"`
2021
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
2122
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
@@ -42,6 +43,15 @@ type Config struct {
4243
ReplicationNum int32 `mapstructure:"replication_num"`
4344
// Timezone is the timezone of the doris.
4445
TimeZone string `mapstructure:"timezone"`
46+
// LogResponse is whether to log the response of doris stream load.
47+
LogResponse bool `mapstructure:"log_response"`
48+
// LabelPrefix is the prefix of the label in doris stream load.
49+
LabelPrefix string `mapstructure:"label_prefix"`
50+
// ProgressInterval is the interval of the progress reporter.
51+
LogProgressInterval int `mapstructure:"log_progress_interval"`
52+
53+
// not in config file, will be set in Validate
54+
timeLocation *time.Location `mapstructure:"-"`
4555
}
4656

4757
type Table struct {
@@ -94,7 +104,8 @@ func (cfg *Config) Validate() (err error) {
94104
err = errors.Join(err, errors.New("metrics table name must be alphanumeric and underscore"))
95105
}
96106

97-
_, errT := cfg.timeZone()
107+
var errT error
108+
cfg.timeLocation, errT = time.LoadLocation(cfg.TimeZone)
98109
if errT != nil {
99110
err = errors.Join(err, errors.New("invalid timezone"))
100111
}
@@ -113,15 +124,10 @@ func (cfg *Config) startHistoryDays() int32 {
113124
return -cfg.HistoryDays
114125
}
115126

116-
func (cfg *Config) timeZone() (*time.Location, error) {
117-
return time.LoadLocation(cfg.TimeZone)
118-
}
119-
120127
const (
121128
properties = `
122129
PROPERTIES (
123130
"replication_num" = "%d",
124-
"enable_single_replica_compaction" = "true",
125131
"compaction_policy" = "time_series",
126132
"dynamic_partition.enable" = "true",
127133
"dynamic_partition.create_history_partition" = "true",

exporter/dorisexporter/config_test.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,53 @@ func TestLoadConfig(t *testing.T) {
3131
defaultCfg := createDefaultConfig()
3232
defaultCfg.(*Config).Endpoint = "http://localhost:8030"
3333
defaultCfg.(*Config).MySQLEndpoint = "localhost:9030"
34+
err = defaultCfg.(*Config).Validate()
35+
require.NoError(t, err)
3436

3537
httpClientConfig := confighttp.NewDefaultClientConfig()
3638
httpClientConfig.Timeout = 5 * time.Second
3739
httpClientConfig.Endpoint = "http://localhost:8030"
40+
httpClientConfig.Headers = map[string]configopaque.String{
41+
"max_filter_ratio": "0.1",
42+
"strict_mode": "true",
43+
"group_commit": "async_mode",
44+
}
45+
46+
fullCfg := &Config{
47+
ClientConfig: httpClientConfig,
48+
BackOffConfig: configretry.BackOffConfig{
49+
Enabled: true,
50+
InitialInterval: 5 * time.Second,
51+
MaxInterval: 30 * time.Second,
52+
MaxElapsedTime: 300 * time.Second,
53+
RandomizationFactor: backoff.DefaultRandomizationFactor,
54+
Multiplier: backoff.DefaultMultiplier,
55+
},
56+
QueueSettings: exporterhelper.QueueConfig{
57+
Enabled: true,
58+
NumConsumers: 10,
59+
QueueSize: 1000,
60+
},
61+
Table: Table{
62+
Logs: "otel_logs",
63+
Traces: "otel_traces",
64+
Metrics: "otel_metrics",
65+
},
66+
Database: "otel",
67+
Username: "admin",
68+
Password: configopaque.String("admin"),
69+
CreateSchema: true,
70+
MySQLEndpoint: "localhost:9030",
71+
HistoryDays: 0,
72+
CreateHistoryDays: 0,
73+
ReplicationNum: 2,
74+
TimeZone: "Asia/Shanghai",
75+
LogResponse: true,
76+
LabelPrefix: "otel",
77+
LogProgressInterval: 5,
78+
}
79+
err = fullCfg.Validate()
80+
require.NoError(t, err)
3881

3982
tests := []struct {
4083
id component.ID
@@ -45,37 +88,8 @@ func TestLoadConfig(t *testing.T) {
4588
expected: defaultCfg,
4689
},
4790
{
48-
id: component.NewIDWithName(metadata.Type, "full"),
49-
expected: &Config{
50-
ClientConfig: httpClientConfig,
51-
BackOffConfig: configretry.BackOffConfig{
52-
Enabled: true,
53-
InitialInterval: 5 * time.Second,
54-
MaxInterval: 30 * time.Second,
55-
MaxElapsedTime: 300 * time.Second,
56-
RandomizationFactor: backoff.DefaultRandomizationFactor,
57-
Multiplier: backoff.DefaultMultiplier,
58-
},
59-
QueueSettings: exporterhelper.QueueConfig{
60-
Enabled: true,
61-
NumConsumers: 10,
62-
QueueSize: 1000,
63-
},
64-
Table: Table{
65-
Logs: "otel_logs",
66-
Traces: "otel_traces",
67-
Metrics: "otel_metrics",
68-
},
69-
Database: "otel",
70-
Username: "admin",
71-
Password: configopaque.String("admin"),
72-
CreateSchema: true,
73-
MySQLEndpoint: "localhost:9030",
74-
HistoryDays: 0,
75-
CreateHistoryDays: 0,
76-
ReplicationNum: 2,
77-
TimeZone: "Asia/Shanghai",
78-
},
91+
id: component.NewIDWithName(metadata.Type, "full"),
92+
expected: fullCfg,
7993
},
8094
}
8195

exporter/dorisexporter/exporter_common.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
_ "github.com/go-sql-driver/mysql" // for register database driver
16+
"github.com/google/uuid"
1617
"go.opentelemetry.io/collector/component"
1718
"go.uber.org/zap"
1819
)
@@ -27,17 +28,16 @@ type commonExporter struct {
2728
logger *zap.Logger
2829
cfg *Config
2930
timeZone *time.Location
31+
reporter *progressReporter
3032
}
3133

32-
func newExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *commonExporter {
33-
// There won't be an error because it's already been validated in the Config.Validate method.
34-
timeZone, _ := cfg.timeZone()
35-
34+
func newExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings, reporterName string) *commonExporter {
3635
return &commonExporter{
3736
TelemetrySettings: set,
3837
logger: logger,
3938
cfg: cfg,
40-
timeZone: timeZone,
39+
timeZone: cfg.timeLocation,
40+
reporter: newProgressReporter(reporterName, cfg.LogProgressInterval, logger),
4141
}
4242
}
4343

@@ -66,14 +66,29 @@ type streamLoadResponse struct {
6666
}
6767

6868
func (r *streamLoadResponse) success() bool {
69-
return r.Status == "Success" || r.Status == "Publish Timeout"
69+
return r.Status == "Success" || r.Status == "Publish Timeout" || r.Status == "Label Already Exists"
70+
}
71+
72+
func (r *streamLoadResponse) duplication() bool {
73+
return r.Status == "Label Already Exists"
7074
}
7175

7276
func streamLoadURL(address string, db string, table string) string {
7377
return address + "/api/" + db + "/" + table + "/_stream_load"
7478
}
7579

76-
func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte) (*http.Request, error) {
80+
func generateLabel(cfg *Config, table string) string {
81+
return fmt.Sprintf(
82+
"%s_%s_%s_%s_%s",
83+
cfg.LabelPrefix,
84+
cfg.Database,
85+
table,
86+
time.Now().In(cfg.timeLocation).Format("20060102150405"),
87+
uuid.New().String(),
88+
)
89+
}
90+
91+
func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []byte, label string) (*http.Request, error) {
7792
url := streamLoadURL(cfg.Endpoint, cfg.Database, table)
7893
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(data))
7994
if err != nil {
@@ -83,6 +98,10 @@ func streamLoadRequest(ctx context.Context, cfg *Config, table string, data []by
8398
req.Header.Set("format", "json")
8499
req.Header.Set("Expect", "100-continue")
85100
req.Header.Set("read_json_by_line", "true")
101+
groupCommit := string(cfg.Headers["group_commit"])
102+
if groupCommit == "" || groupCommit == "off_mode" {
103+
req.Header.Set("label", label)
104+
}
86105
if cfg.ClientConfig.Timeout != 0 {
87106
req.Header.Set("timeout", fmt.Sprintf("%d", cfg.ClientConfig.Timeout/time.Second))
88107
}

exporter/dorisexporter/exporter_common_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@ import (
1515

1616
func TestNewCommonExporter(t *testing.T) {
1717
cfg := createDefaultConfig().(*Config)
18-
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings())
18+
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings(), "")
1919
require.NotNil(t, exporter)
2020
}
2121

2222
func TestCommonExporter_FormatTime(t *testing.T) {
2323
cfg := createDefaultConfig().(*Config)
24-
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings())
24+
cfg.Endpoint = "http://localhost:8030"
25+
cfg.CreateSchema = false
26+
err := cfg.Validate()
27+
require.NoError(t, err)
28+
29+
exporter := newExporter(nil, cfg, componenttest.NewNopTelemetrySettings(), "")
2530
require.NotNil(t, exporter)
2631

2732
now := time.Date(2024, 1, 1, 0, 0, 0, 1000, time.Local)
@@ -62,7 +67,7 @@ func findRandomPort() (int, error) {
6267
return port, nil
6368
}
6469

65-
func TestToJsonLines(t *testing.T) {
70+
func TestToJSONLines(t *testing.T) {
6671
logs, err := toJSONLines([]*dLog{
6772
{}, {},
6873
})

exporter/dorisexporter/exporter_logs.go

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type logsExporter struct {
4242

4343
func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter {
4444
return &logsExporter{
45-
commonExporter: newExporter(logger, cfg, set),
45+
commonExporter: newExporter(logger, cfg, set, "LOG"),
4646
}
4747
}
4848

@@ -53,24 +53,27 @@ func (e *logsExporter) start(ctx context.Context, host component.Host) error {
5353
}
5454
e.client = client
5555

56-
if !e.cfg.CreateSchema {
57-
return nil
58-
}
56+
if e.cfg.CreateSchema {
57+
conn, err := createDorisMySQLClient(e.cfg)
58+
if err != nil {
59+
return err
60+
}
61+
defer conn.Close()
5962

60-
conn, err := createDorisMySQLClient(e.cfg)
61-
if err != nil {
62-
return err
63-
}
64-
defer conn.Close()
63+
err = createAndUseDatabase(ctx, conn, e.cfg)
64+
if err != nil {
65+
return err
66+
}
6567

66-
err = createAndUseDatabase(ctx, conn, e.cfg)
67-
if err != nil {
68-
return err
68+
ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
69+
_, err = conn.ExecContext(ctx, ddl)
70+
if err != nil {
71+
return err
72+
}
6973
}
7074

71-
ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
72-
_, err = conn.ExecContext(ctx, ddl)
73-
return err
75+
go e.reporter.report()
76+
return nil
7477
}
7578

7679
func (e *logsExporter) shutdown(_ context.Context) error {
@@ -81,6 +84,7 @@ func (e *logsExporter) shutdown(_ context.Context) error {
8184
}
8285

8386
func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
87+
label := generateLabel(e.cfg, e.cfg.Table.Logs)
8488
logs := make([]*dLog, 0, ld.LogRecordCount())
8589

8690
for i := 0; i < ld.ResourceLogs().Len(); i++ {
@@ -118,16 +122,16 @@ func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
118122
}
119123
}
120124

121-
return e.pushLogDataInternal(ctx, logs)
125+
return e.pushLogDataInternal(ctx, logs, label)
122126
}
123127

124-
func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error {
128+
func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog, label string) error {
125129
marshal, err := toJSONLines(logs)
126130
if err != nil {
127131
return err
128132
}
129133

130-
req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal)
134+
req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal, label)
131135
if err != nil {
132136
return err
133137
}
@@ -149,9 +153,21 @@ func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) er
149153
return err
150154
}
151155

152-
if !response.success() {
153-
return fmt.Errorf("failed to push log data: %s", response.Message)
156+
if response.success() {
157+
e.reporter.incrTotalRows(int64(len(logs)))
158+
e.reporter.incrTotalBytes(int64(len(marshal)))
159+
160+
if response.duplication() {
161+
e.logger.Warn("label already exists", zap.String("label", label), zap.Int("skipped", len(logs)))
162+
}
163+
164+
if e.cfg.LogResponse {
165+
e.logger.Info("log response:\n" + string(body))
166+
} else {
167+
e.logger.Debug("log response:\n" + string(body))
168+
}
169+
return nil
154170
}
155171

156-
return nil
172+
return fmt.Errorf("failed to push log data, response:%s", string(body))
157173
}

0 commit comments

Comments
 (0)