Skip to content

[exporter/clickhouseexporter] Support JSON type for logs and traces #40547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 53 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
ff7a946
refactor checkpoint
SpencerTorres May 29, 2025
cd1a250
use fully qualified names for table creation/insertion
SpencerTorres May 30, 2025
068cfe3
remove comment
SpencerTorres May 30, 2025
53579ed
update func call
SpencerTorres May 30, 2025
c453e43
move connection init to Start function, fix some unit tests
SpencerTorres May 30, 2025
200fe05
use new batch.Close func
SpencerTorres Jun 6, 2025
b83af98
createSchema todo
SpencerTorres Jun 6, 2025
44c14d7
fix summary table ddl
SpencerTorres Jun 6, 2025
99d3edc
convert exporter tests to proper integration tests
SpencerTorres Jun 6, 2025
27cb389
remove duplicate tests, add integration build flags, goleak check
SpencerTorres Jun 6, 2025
61540ce
fix lint
SpencerTorres Jun 6, 2025
c4ff013
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 6, 2025
3b1a708
remove default_ddl, use sqltemplates folder
SpencerTorres Jun 6, 2025
8d81900
rename file to original generated
SpencerTorres Jun 6, 2025
63278cd
remove test build tag
SpencerTorres Jun 6, 2025
26a044c
package test, rename context
SpencerTorres Jun 6, 2025
339c0c3
add tests
SpencerTorres Jun 7, 2025
64079aa
run goporto
SpencerTorres Jun 7, 2025
d77e451
run generate
SpencerTorres Jun 7, 2025
d592be3
revert lifecylce tests
SpencerTorres Jun 7, 2025
452578c
remove TestMain dependency for integration tests
SpencerTorres Jun 8, 2025
5b36389
make fmt
SpencerTorres Jun 8, 2025
48d8428
re-use existing scope variable
SpencerTorres Jun 8, 2025
8d4a4e5
collapse if
SpencerTorres Jun 8, 2025
57edec2
export json logs
SpencerTorres Jun 8, 2025
c1640ea
traces json
SpencerTorres Jun 9, 2025
2295904
json integration tests
SpencerTorres Jun 9, 2025
91106bd
add tests, changelog, fmt/lint
SpencerTorres Jun 9, 2025
a702d61
readme
SpencerTorres Jun 9, 2025
099c236
changelog
SpencerTorres Jun 9, 2025
a72fa02
run goporto
SpencerTorres Jun 9, 2025
33003d8
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 9, 2025
268a4b1
go tidy
SpencerTorres Jun 9, 2025
b629b1c
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 10, 2025
9188e11
go mod
SpencerTorres Jun 10, 2025
4a79876
err is always nil
SpencerTorres Jun 10, 2025
eb39b0f
Merge branch 'clickhouse_full_reorganize' of github.com:SpencerTorres…
SpencerTorres Jun 10, 2025
a9a8569
go mod
SpencerTorres Jun 10, 2025
3759309
Update clickhouse-go to enable HTTP, add HTTP integration tests
SpencerTorres Jun 16, 2025
8830b46
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 16, 2025
2c5e009
go mod
SpencerTorres Jun 16, 2025
a908280
Merge branch 'clickhouse_full_reorganize' of github.com:SpencerTorres…
SpencerTorres Jun 16, 2025
f2365e4
Update JSON integration tests with HTTP+Native
SpencerTorres Jun 16, 2025
04297e3
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 16, 2025
4f6ed55
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 17, 2025
74e21c3
update integration tests to be concurrent, update driver for critical…
SpencerTorres Jun 17, 2025
cc3f4ac
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 17, 2025
5ed29bf
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 19, 2025
89237a8
compression for JSON column DDL
SpencerTorres Jun 19, 2025
178de0a
remove custom encoder, use std json.Marshal
SpencerTorres Jun 22, 2025
6a88339
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 22, 2025
49c779f
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
SpencerTorres Jun 25, 2025
5cdcfd0
make gotidy
SpencerTorres Jun 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .chloggen/clickhouse-export-json-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'clickhouseexporter'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support JSON type for logs and traces

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [40547]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Added a feature gate to enable a JSON pipeline for logs and traces.
This feature gate ID is `clickhouse.json`, and will automatically use the new
DDL and column type on supported server versions.
You may also need to add `enable_json_type=1` to your connection
settings, depending on the server version.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
33 changes: 21 additions & 12 deletions exporter/clickhouseexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ If the official plugin doesn't meet your needs, you can try the [Altinity plugin
- Get log severity count time series.

```sql
SELECT toDateTime(toStartOfInterval(TimestampTime, INTERVAL 60 second)) as time, SeverityText, count() as count
SELECT toDateTime(toStartOfInterval(Timestamp, INTERVAL 60 second)) as time, SeverityText, count() as count
FROM otel_logs
WHERE time >= NOW() - INTERVAL 1 HOUR
GROUP BY SeverityText, time
Expand All @@ -60,7 +60,7 @@ ORDER BY time;
```sql
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE TimestampTime >= NOW() - INTERVAL 1 HOUR
WHERE Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -70,7 +70,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE ServiceName = 'clickhouse-exporter'
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -80,7 +80,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE LogAttributes['container_name'] = '/example_flog_1'
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -90,7 +90,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE hasToken(Body, 'http')
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -100,7 +100,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE Body like '%http%'
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -110,7 +110,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE match(Body, 'http')
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand All @@ -120,7 +120,7 @@ Limit 100;
SELECT Timestamp as log_time, Body
FROM otel_logs
WHERE JSONExtractFloat(Body, 'bytes') > 1000
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
AND Timestamp >= NOW() - INTERVAL 1 HOUR
Limit 100;
```

Expand Down Expand Up @@ -347,12 +347,12 @@ use the `https` scheme.

## Schema management

By default the exporter will create the database and tables under the names defined in the config. This is fine for simple deployments, but for production workloads, it is recommended that you manage your own schema by setting `create_schema` to `false` in the config.
By default, the exporter will create the database and tables under the names defined in the config. This is fine for simple deployments, but for production workloads, it is recommended that you manage your own schema by setting `create_schema` to `false` in the config.
This prevents each exporter process from racing to create the database and tables, and makes it easier to upgrade the exporter in the future.

In this mode, the only SQL sent to your server will be for `INSERT` statements.

The default DDL used by the exporter can be found in `example/default_ddl`.
The default DDL used by the exporter can be found in `internal/sqltemplates`.
Be sure to customize the indexes, TTL, and partitioning to fit your deployment.
Column names and types must be the same to preserve compatibility with the exporter's `INSERT` statements.
As long as the column names/types match the `INSERT` statement, you can create whatever kind of table you want.
Expand Down Expand Up @@ -410,14 +410,23 @@ service:
exporters: [ clickhouse ]
```
## Experimental JSON support
A feature gate is available for testing the experimental JSON pipeline.
Enable the `clickhouse.json` feature gate by following the [feature gate documentation](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md).
You may also need to add `enable_json_type=1` to your `endpoint` query string parameters.
DDL has been updated, but feel free to tune the schema as needed. DDL can be found in the `internal/sqltemplates` package.
All `Map` columns have been replaced with `JSON`.
ClickHouse v25+ is recommended for reliable JSON support.

## Contributing

Before contributing, review the contribution guidelines in [CONTRIBUTING.md](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md).

#### Integration tests

Integration tests can be run with the following command:
Integration tests can be run with the following command (includes unit tests):
```sh
go test -tags integration -run=TestIntegration
go test -tags integration
```
*Note: Make sure integration tests pass after making changes to SQL.*
12 changes: 3 additions & 9 deletions exporter/clickhouseexporter/exporter_logs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsEx
}

func verifyExportLogs(t *testing.T, exporter *logsExporter) {
// 3 pushes
mustPushLogsData(t, exporter, simpleLogs(5000))
mustPushLogsData(t, exporter, simpleLogs(5000))
mustPushLogsData(t, exporter, simpleLogs(5000))
pushConcurrentlyNoError(t, func() error {
return exporter.pushLogsData(context.Background(), simpleLogs(5000))
})

type log struct {
Timestamp time.Time `ch:"Timestamp"`
Expand Down Expand Up @@ -90,11 +89,6 @@ func verifyExportLogs(t *testing.T, exporter *logsExporter) {
require.Equal(t, expectedLog, actualLog)
}

func mustPushLogsData(t *testing.T, exporter *logsExporter, ld plog.Logs) {
err := exporter.pushLogsData(context.Background(), ld)
require.NoError(t, err)
}

func simpleLogs(count int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
Expand Down
186 changes: 186 additions & 0 deletions exporter/clickhouseexporter/exporter_logs_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/sqltemplates"
)

type logsJSONExporter struct {
cfg *Config
logger *zap.Logger
db driver.Conn
insertSQL string
}

func newLogsJSONExporter(logger *zap.Logger, cfg *Config) *logsJSONExporter {
return &logsJSONExporter{
cfg: cfg,
logger: logger,
insertSQL: renderInsertLogsJSONSQL(cfg),
}
}

func (e *logsJSONExporter) start(ctx context.Context, _ component.Host) error {
dsn, err := e.cfg.buildDSN()
if err != nil {
return err
}

e.db, err = internal.NewClickhouseClient(dsn)
if err != nil {
return err
}

if e.cfg.shouldCreateSchema() {
if err := internal.CreateDatabase(ctx, e.db, e.cfg.database(), e.cfg.clusterString()); err != nil {
return err
}

if err := createLogsJSONTable(ctx, e.cfg, e.db); err != nil {
return err
}
}

return nil
}

func (e *logsJSONExporter) shutdown(_ context.Context) error {
if e.db != nil {
if err := e.db.Close(); err != nil {
e.logger.Warn("failed to close json logs db connection", zap.Error(err))
}
}

return nil
}

func (e *logsJSONExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
batch, err := e.db.PrepareBatch(ctx, e.insertSQL)
if err != nil {
return err
}
defer func(batch driver.Batch) {
closeErr := batch.Close()
if closeErr != nil {
e.logger.Warn("failed to close json logs batch", zap.Error(closeErr))
}
}(batch)

processStart := time.Now()

var logCount int
rsLogs := ld.ResourceLogs()
rsLen := rsLogs.Len()
for i := 0; i < rsLen; i++ {
logs := rsLogs.At(i)
res := logs.Resource()
resURL := logs.SchemaUrl()
resAttr := res.Attributes()
serviceName := internal.GetServiceName(resAttr)
resAttrBytes, resAttrErr := json.Marshal(resAttr.AsRaw())
if resAttrErr != nil {
return fmt.Errorf("failed to marshal json log resource attributes: %w", resAttrErr)
}

slLen := logs.ScopeLogs().Len()
for j := 0; j < slLen; j++ {
scopeLog := logs.ScopeLogs().At(j)
scopeURL := scopeLog.SchemaUrl()
scopeLogScope := scopeLog.Scope()
scopeName := scopeLogScope.Name()
scopeVersion := scopeLogScope.Version()
scopeLogRecords := scopeLog.LogRecords()
scopeAttrBytes, scopeAttrErr := json.Marshal(scopeLogScope.Attributes().AsRaw())
if scopeAttrErr != nil {
return fmt.Errorf("failed to marshal json log scope attributes: %w", scopeAttrErr)
}

slrLen := scopeLogRecords.Len()
for k := 0; k < slrLen; k++ {
r := scopeLogRecords.At(k)
logAttrBytes, logAttrErr := json.Marshal(r.Attributes().AsRaw())
if logAttrErr != nil {
return fmt.Errorf("failed to marshal json log attributes: %w", logAttrErr)
}

timestamp := r.Timestamp()
if timestamp == 0 {
timestamp = r.ObservedTimestamp()
}

appendErr := batch.Append(
timestamp.AsTime(),
r.TraceID().String(),
r.SpanID().String(),
uint8(r.Flags()),
r.SeverityText(),
uint8(r.SeverityNumber()),
serviceName,
r.Body().Str(),
resURL,
resAttrBytes,
scopeURL,
scopeName,
scopeVersion,
scopeAttrBytes,
logAttrBytes,
)
if appendErr != nil {
return fmt.Errorf("failed to append json log row: %w", appendErr)
}

logCount++
}
}
}

processDuration := time.Since(processStart)
networkStart := time.Now()
if sendErr := batch.Send(); sendErr != nil {
return fmt.Errorf("logs json insert failed: %w", sendErr)
}

networkDuration := time.Since(networkStart)
totalDuration := time.Since(processStart)
e.logger.Debug("insert json logs",
zap.Int("records", logCount),
zap.String("process_cost", processDuration.String()),
zap.String("network_cost", networkDuration.String()),
zap.String("total_cost", totalDuration.String()))

return nil
}

func renderInsertLogsJSONSQL(cfg *Config) string {
return fmt.Sprintf(sqltemplates.LogsJSONInsert, cfg.database(), cfg.LogsTableName)
}

func renderCreateLogsJSONTableSQL(cfg *Config) string {
ttlExpr := internal.GenerateTTLExpr(cfg.TTL, "Timestamp")
return fmt.Sprintf(sqltemplates.LogsJSONCreateTable,
cfg.database(), cfg.LogsTableName, cfg.clusterString(),
cfg.tableEngineString(),
ttlExpr,
)
}

func createLogsJSONTable(ctx context.Context, cfg *Config, db driver.Conn) error {
if err := db.Exec(ctx, renderCreateLogsJSONTableSQL(cfg)); err != nil {
return fmt.Errorf("exec create logs json table sql: %w", err)
}

return nil
}
Loading