Skip to content

Commit 4542bbf

Browse files
mauri870carsonipChrsMark
authored
[elasticsearchexporter]: Add dynamic document id support for logs (#37065)
#### Description This PR adds a new config option `logs_dynamic_id` that when set to true reads the `elasticsearch.document_id` attribute from each log record and uses it as the final document id in Elasticsearch. This is only implemented for logs but I can open subsequent PRs supporting metrics and traces akin to the `*_dynamic_index` options. Fixes #36882 #### Testing Added tests to verify that the document ID attribute can be read from the log record and that the _id is properly forwarded to Elasticsearch. Also asserted that when there is no doc id attribute the current behavior is retained. #### Documentation Updated the readme to mention the new `logs_dynamic_id` config option. --------- Co-authored-by: Carson Ip <[email protected]> Co-authored-by: Christos Markou <[email protected]>
1 parent 39816b7 commit 4542bbf

File tree

11 files changed

+171
-13
lines changed

11 files changed

+171
-13
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add config `logs_dynamic_id` to dynamically set the document ID of log records using log record attribute `elasticsearch.document_id`
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36882]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ This can be customised through the following settings:
142142
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
143143
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.
144144

145+
- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute.
146+
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document.
147+
145148
### Elasticsearch document mapping
146149

147150
The Elasticsearch exporter supports several document schemas and preprocessing

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type bulkIndexer interface {
3131

3232
type bulkIndexerSession interface {
3333
// Add adds a document to the bulk indexing session.
34-
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error
34+
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error
3535

3636
// End must be called on the session object once it is no longer
3737
// needed, in order to release any associated resources.
@@ -126,8 +126,9 @@ type syncBulkIndexerSession struct {
126126
}
127127

128128
// Add adds an item to the sync bulk indexer session.
129-
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
130-
err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
129+
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
130+
doc := docappender.BulkIndexerItem{Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates}
131+
err := s.bi.Add(doc)
131132
if err != nil {
132133
return err
133134
}
@@ -248,10 +249,11 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
248249
// Add adds an item to the async bulk indexer session.
249250
//
250251
// Adding an item after a call to Close() will panic.
251-
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
252+
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
252253
item := docappender.BulkIndexerItem{
253254
Index: index,
254255
Body: document,
256+
DocumentID: docID,
255257
DynamicTemplates: dynamicTemplates,
256258
}
257259
select {

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
102102
session, err := bulkIndexer.StartSession(context.Background())
103103
require.NoError(t, err)
104104

105-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
105+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
106106
// should flush
107107
time.Sleep(100 * time.Millisecond)
108108
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
@@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
229229
session, err := bulkIndexer.StartSession(context.Background())
230230
require.NoError(t, err)
231231

232-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
232+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
233233
// should flush
234234
time.Sleep(100 * time.Millisecond)
235235
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
@@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie
312312
session, err := bulkIndexer.StartSession(context.Background())
313313
require.NoError(t, err)
314314

315-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
315+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
316316
assert.NoError(t, bulkIndexer.Close(context.Background()))
317317

318318
return bulkIndexer
@@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
338338
session, err := bi.StartSession(context.Background())
339339
require.NoError(t, err)
340340

341-
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
341+
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
342342
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
343343
assert.NoError(t, bi.Close(context.Background()))
344344
}

exporter/elasticsearchexporter/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ type Config struct {
5353
// fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
5454
TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"`
5555

56+
// LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES.
57+
LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"`
58+
5659
// Pipeline configures the ingest node pipeline name that should be used to process the
5760
// events.
5861
//
@@ -112,6 +115,10 @@ type DynamicIndexSetting struct {
112115
Enabled bool `mapstructure:"enabled"`
113116
}
114117

118+
type DynamicIDSettings struct {
119+
Enabled bool `mapstructure:"enabled"`
120+
}
121+
115122
// AuthenticationSettings defines user authentication related settings.
116123
type AuthenticationSettings struct {
117124
// User is used to configure HTTP Basic Authentication.

exporter/elasticsearchexporter/config_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ func TestConfig(t *testing.T) {
7373
TracesDynamicIndex: DynamicIndexSetting{
7474
Enabled: false,
7575
},
76+
LogsDynamicID: DynamicIDSettings{
77+
Enabled: false,
78+
},
7679
Pipeline: "mypipeline",
7780
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
7881
cfg.Timeout = 2 * time.Minute
@@ -144,6 +147,9 @@ func TestConfig(t *testing.T) {
144147
TracesDynamicIndex: DynamicIndexSetting{
145148
Enabled: false,
146149
},
150+
LogsDynamicID: DynamicIDSettings{
151+
Enabled: false,
152+
},
147153
Pipeline: "mypipeline",
148154
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
149155
cfg.Timeout = 2 * time.Minute
@@ -215,6 +221,9 @@ func TestConfig(t *testing.T) {
215221
TracesDynamicIndex: DynamicIndexSetting{
216222
Enabled: false,
217223
},
224+
LogsDynamicID: DynamicIDSettings{
225+
Enabled: false,
226+
},
218227
Pipeline: "mypipeline",
219228
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
220229
cfg.Timeout = 2 * time.Minute

exporter/elasticsearchexporter/exporter.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ import (
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
2323
)
2424

25+
const (
26+
// documentIDAttributeName is the attribute name used to specify the document ID.
27+
documentIDAttributeName = "elasticsearch.document_id"
28+
)
29+
2530
type elasticsearchExporter struct {
2631
component.TelemetrySettings
2732
userAgent string
@@ -176,13 +181,15 @@ func (e *elasticsearchExporter) pushLogRecord(
176181
}
177182

178183
buf := e.bufferPool.NewPooledBuffer()
184+
docID := e.extractDocumentIDAttribute(record.Attributes())
179185
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, fIndex, buf.Buffer)
180186
if err != nil {
181187
buf.Recycle()
182188
return fmt.Errorf("failed to encode log event: %w", err)
183189
}
190+
184191
// not recycling after Add returns an error as we don't know if it's already recycled
185-
return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil)
192+
return bulkIndexerSession.Add(ctx, fIndex.Index, docID, buf, nil)
186193
}
187194

188195
func (e *elasticsearchExporter) pushMetricsData(
@@ -299,7 +306,7 @@ func (e *elasticsearchExporter) pushMetricsData(
299306
errs = append(errs, err)
300307
continue
301308
}
302-
if err := session.Add(ctx, fIndex.Index, buf, dynamicTemplates); err != nil {
309+
if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates); err != nil {
303310
// not recycling after Add returns an error as we don't know if it's already recycled
304311
if cerr := ctx.Err(); cerr != nil {
305312
return cerr
@@ -422,7 +429,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
422429
return fmt.Errorf("failed to encode trace record: %w", err)
423430
}
424431
// not recycling after Add returns an error as we don't know if it's already recycled
425-
return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil)
432+
return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil)
426433
}
427434

428435
func (e *elasticsearchExporter) pushSpanEvent(
@@ -454,5 +461,17 @@ func (e *elasticsearchExporter) pushSpanEvent(
454461
return nil
455462
}
456463
// not recycling after Add returns an error as we don't know if it's already recycled
457-
return bulkIndexerSession.Add(ctx, fIndex.Index, buf, nil)
464+
return bulkIndexerSession.Add(ctx, fIndex.Index, "", buf, nil)
465+
}
466+
467+
func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string {
468+
if !e.config.LogsDynamicID.Enabled {
469+
return ""
470+
}
471+
472+
v, ok := m.Get(documentIDAttributeName)
473+
if !ok {
474+
return ""
475+
}
476+
return v.AsString()
458477
}

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,82 @@ func TestExporterLogs(t *testing.T) {
736736
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
737737
})
738738

739+
t.Run("publish logs with dynamic id", func(t *testing.T) {
740+
t.Parallel()
741+
exampleDocID := "abc123"
742+
tableTests := []struct {
743+
name string
744+
expectedDocID string // "" means the _id will not be set
745+
recordAttrs map[string]any
746+
}{
747+
{
748+
name: "missing document id attribute should not set _id",
749+
expectedDocID: "",
750+
},
751+
{
752+
name: "empty document id attribute should not set _id",
753+
expectedDocID: "",
754+
recordAttrs: map[string]any{
755+
documentIDAttributeName: "",
756+
},
757+
},
758+
{
759+
name: "record attributes",
760+
expectedDocID: exampleDocID,
761+
recordAttrs: map[string]any{
762+
documentIDAttributeName: exampleDocID,
763+
},
764+
},
765+
}
766+
767+
cfgs := map[string]func(*Config){
768+
"async": func(cfg *Config) {
769+
batcherEnabled := false
770+
cfg.Batcher.Enabled = &batcherEnabled
771+
},
772+
"sync": func(cfg *Config) {
773+
batcherEnabled := true
774+
cfg.Batcher.Enabled = &batcherEnabled
775+
cfg.Batcher.FlushTimeout = 10 * time.Millisecond
776+
},
777+
}
778+
for _, tt := range tableTests {
779+
for cfgName, cfgFn := range cfgs {
780+
t.Run(tt.name+"/"+cfgName, func(t *testing.T) {
781+
t.Parallel()
782+
rec := newBulkRecorder()
783+
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
784+
rec.Record(docs)
785+
786+
if tt.expectedDocID == "" {
787+
assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set")
788+
} else {
789+
assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set")
790+
}
791+
792+
// Ensure the document id attribute is removed from the final document.
793+
assert.NotContains(t, string(docs[0].Document), documentIDAttributeName, "expected document id attribute to be removed")
794+
return itemsAllOK(docs)
795+
})
796+
797+
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
798+
cfg.Mapping.Mode = "otel"
799+
cfg.LogsDynamicID.Enabled = true
800+
cfgFn(cfg)
801+
})
802+
logs := newLogsWithAttributes(
803+
tt.recordAttrs,
804+
map[string]any{},
805+
map[string]any{},
806+
)
807+
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world")
808+
mustSendLogs(t, exporter, logs)
809+
810+
rec.WaitItems(1)
811+
})
812+
}
813+
}
814+
})
739815
t.Run("otel mode attribute complex value", func(t *testing.T) {
740816
rec := newBulkRecorder()
741817
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
@@ -1943,3 +2019,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string {
19432019
require.NoError(t, err)
19442020
return action.Create.Index
19452021
}
2022+
2023+
func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string {
2024+
action := struct {
2025+
Create struct {
2026+
ID string `json:"_id"`
2027+
} `json:"create"`
2028+
}{}
2029+
err := json.Unmarshal(actionJSON, &action)
2030+
require.NoError(t, err)
2031+
return action.Create.ID
2032+
}

exporter/elasticsearchexporter/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ func createDefaultConfig() component.Config {
6262
TracesDynamicIndex: DynamicIndexSetting{
6363
Enabled: false,
6464
},
65+
LogsDynamicID: DynamicIDSettings{
66+
Enabled: false,
67+
},
6568
Retry: RetrySettings{
6669
Enabled: true,
6770
MaxRetries: 0, // default is set in exporter code

exporter/elasticsearchexporter/pdata_serializer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func writeAttributes(v *json.Visitor, attributes pcommon.Map, stringifyMapValues
298298
_ = v.OnObjectStart(-1, structform.AnyType)
299299
attributes.Range(func(k string, val pcommon.Value) bool {
300300
switch k {
301-
case dataStreamType, dataStreamDataset, dataStreamNamespace, elasticsearch.MappingHintsAttrKey:
301+
case dataStreamType, dataStreamDataset, dataStreamNamespace, elasticsearch.MappingHintsAttrKey, documentIDAttributeName:
302302
return true
303303
}
304304
if isGeoAttribute(k, val) {

exporter/elasticsearchexporter/pdata_serializer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func TestSerializeLog(t *testing.T) {
3131
record.Attributes().PutDouble("double", 42.0)
3232
record.Attributes().PutInt("int", 42)
3333
record.Attributes().PutEmptyBytes("bytes").Append(42)
34+
record.Attributes().PutStr(documentIDAttributeName, "my_id")
3435
_ = record.Attributes().PutEmptySlice("slice").FromRaw([]any{42, "foo"})
3536
record.Attributes().PutEmptySlice("map_slice").AppendEmpty().SetEmptyMap().PutStr("foo.bar", "baz")
3637
mapAttr := record.Attributes().PutEmptyMap("map")

0 commit comments

Comments
 (0)