Skip to content

Commit e71b432

Browse files
committed
elasticsearchexporter: mapping mode from scope
Add support for specifying the mapping mode as a scope attribute, "elastic.mapping.mode". If specified, this overrides X-Elastic-Mapping-Mode. Simplify StartSession by removing the error result, since the only time it can fail is if the BulkIndexerConfig is invalid, and we ensure that cannot happen. We now also pre-create arrays of encoders, routers, and data point hashers. This simplifies and speeds up acquisition of these during execution.
1 parent 4930224 commit e71b432

File tree

10 files changed

+360
-202
lines changed

10 files changed

+360
-202
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for extracting mapping mode from a scope attribute.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39110]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
9090
> [!WARNING]
9191
> The `batcher` config is experimental and may change without notice.
9292

93-
The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterbatcher/config.go).
93+
The Elasticsearch exporter supports the [common `batcher` settings](https://github.com/open-telemetry/opentelemetry-collector/tree/v0.123.0/exporter/exporterbatcher/config.go).
9494

9595
- `batcher`:
9696
- `enabled` (default=unset): Enable batching of requests into 1 or more bulk requests. On a batcher flush, it is possible for a batched request to be translated to more than 1 bulk request due to `flush::bytes`.
@@ -189,8 +189,10 @@ The mapping mode can also be controlled via the client metadata key `X-Elastic-M
189189
e.g. via HTTP headers, gRPC metadata. This will override the configured `mapping::mode`.
190190
It is possible to restrict which mapping modes may be requested by configuring
191191
`mapping::allowed_modes`, which defaults to all mapping modes. Keep in mind that not all
192-
processors or exporter configurations will maintain client
193-
metadata.
192+
processors or exporter configurations will maintain client metadata.
193+
194+
Finally, the mapping mode can be controlled via the scope attribute `elastic.mapping.mode`.
195+
If specified, this takes precedence over the `X-Elastic-Mapping-Mode` client metadata.
194196

195197
See below for a description of each mapping mode.
196198

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
type bulkIndexer interface {
2626
// StartSession starts a new bulk indexing session.
27-
StartSession(context.Context) (bulkIndexerSession, error)
27+
StartSession(context.Context) bulkIndexerSession
2828

2929
// Close closes the bulk indexer, ending any in-progress
3030
// sessions and stopping any background processing.
@@ -106,15 +106,16 @@ type syncBulkIndexer struct {
106106

107107
// StartSession creates a new docappender.BulkIndexer, and wraps
108108
// it with a syncBulkIndexerSession.
109-
func (s *syncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) {
109+
func (s *syncBulkIndexer) StartSession(context.Context) bulkIndexerSession {
110110
bi, err := docappender.NewBulkIndexer(s.config)
111111
if err != nil {
112-
return nil, err
112+
// This should never happen in practice:
113+
// NewBulkIndexer should only fail if the
114+
// config is invalid, and we expect it to
115+
// always be valid at this point.
116+
return errBulkIndexerSession{err: err}
113117
}
114-
return &syncBulkIndexerSession{
115-
s: s,
116-
bi: bi,
117-
}, nil
118+
return &syncBulkIndexerSession{s: s, bi: bi}
118119
}
119120

120121
// Close is a no-op.
@@ -235,8 +236,8 @@ type asyncBulkIndexerSession struct {
235236
}
236237

237238
// StartSession returns a new asyncBulkIndexerSession.
238-
func (a *asyncBulkIndexer) StartSession(context.Context) (bulkIndexerSession, error) {
239-
return asyncBulkIndexerSession{a}, nil
239+
func (a *asyncBulkIndexer) StartSession(context.Context) bulkIndexerSession {
240+
return asyncBulkIndexerSession{a}
240241
}
241242

242243
// Close closes the asyncBulkIndexer and any active sessions.
@@ -488,14 +489,10 @@ type wgTrackingBulkIndexer struct {
488489
wg *sync.WaitGroup
489490
}
490491

491-
func (w *wgTrackingBulkIndexer) StartSession(ctx context.Context) (bulkIndexerSession, error) {
492+
func (w *wgTrackingBulkIndexer) StartSession(ctx context.Context) bulkIndexerSession {
492493
w.wg.Add(1)
493-
session, err := w.bulkIndexer.StartSession(ctx)
494-
if err != nil {
495-
w.wg.Done()
496-
return nil, err
497-
}
498-
return &wgTrackingBulkIndexerSession{bulkIndexerSession: session, wg: w.wg}, nil
494+
session := w.bulkIndexer.StartSession(ctx)
495+
return &wgTrackingBulkIndexerSession{bulkIndexerSession: session, wg: w.wg}
499496
}
500497

501498
type wgTrackingBulkIndexerSession struct {
@@ -507,3 +504,17 @@ func (w *wgTrackingBulkIndexerSession) End() {
507504
defer w.wg.Done()
508505
w.bulkIndexerSession.End()
509506
}
507+
508+
type errBulkIndexerSession struct {
509+
err error
510+
}
511+
512+
func (s errBulkIndexerSession) Add(context.Context, string, string, string, io.WriterTo, map[string]string, string) error {
513+
return fmt.Errorf("creating bulk indexer session failed, cannot add item: %w", s.err)
514+
}
515+
516+
func (s errBulkIndexerSession) End() {}
517+
518+
func (s errBulkIndexerSession) Flush(context.Context) error {
519+
return fmt.Errorf("creating bulk indexer session failed, cannot flush: %w", s.err)
520+
}

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,14 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
101101

102102
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config, false)
103103
require.NoError(t, err)
104-
session, err := bulkIndexer.StartSession(context.Background())
105-
require.NoError(t, err)
106104

105+
session := bulkIndexer.StartSession(context.Background())
107106
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
108107
// should flush
109108
time.Sleep(100 * time.Millisecond)
110109
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
110+
assert.NoError(t, session.Flush(context.Background()))
111+
session.End()
111112
assert.NoError(t, bulkIndexer.Close(context.Background()))
112113
})
113114
}
@@ -178,9 +179,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
178179
require.NoError(t, err)
179180
defer bulkIndexer.Close(context.Background())
180181

181-
session, err := bulkIndexer.StartSession(context.Background())
182-
require.NoError(t, err)
183-
182+
session := bulkIndexer.StartSession(context.Background())
184183
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
185184
// should flush
186185
time.Sleep(100 * time.Millisecond)
@@ -190,6 +189,8 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
190189
for _, wantField := range tt.wantFields {
191190
assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All())
192191
}
192+
assert.NoError(t, session.Flush(context.Background()))
193+
session.End()
193194
})
194195
}
195196
}
@@ -265,10 +266,11 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
265266
func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
266267
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config, false)
267268
require.NoError(t, err)
268-
session, err := bulkIndexer.StartSession(context.Background())
269-
require.NoError(t, err)
270269

270+
session := bulkIndexer.StartSession(context.Background())
271271
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
272+
assert.NoError(t, session.Flush(context.Background()))
273+
session.End()
272274
assert.NoError(t, bulkIndexer.Close(context.Background()))
273275

274276
return bulkIndexer
@@ -292,10 +294,11 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
292294
require.NoError(t, err)
293295

294296
bi := newSyncBulkIndexer(zap.NewNop(), client, &cfg, false)
295-
session, err := bi.StartSession(context.Background())
296-
require.NoError(t, err)
297297

298+
session := bi.StartSession(context.Background())
298299
assert.NoError(t, session.Add(context.Background(), "foo", "", "", strings.NewReader(`{"foo": "bar"}`), nil, docappender.ActionCreate))
299300
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
301+
assert.NoError(t, session.Flush(context.Background()))
302+
session.End()
300303
assert.NoError(t, bi.Close(context.Background()))
301304
}

exporter/elasticsearchexporter/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,12 @@ type RetrySettings struct {
205205
type MappingsSettings struct {
206206
// Mode configures the default document mapping mode.
207207
//
208-
// The mode may be overridden by the client metadata key
209-
// X-Elastic-Mapping-Mode, if specified.
208+
// The mode may be overridden in two ways:
209+
// - by the client metadata key X-Elastic-Mapping-Mode, if specified
210+
// - by the scope attribute elastic.mapping.mode, if specified
211+
//
212+
// The order of precedence is:
213+
// scope attribute > client metadata > default mode.
210214
Mode string `mapstructure:"mode"`
211215

212216
// AllowedModes controls the allowed document mapping modes

0 commit comments

Comments
 (0)