Skip to content

Commit 32f17c0

Browse files
carsonipsbylica-splunk
authored andcommitted
[exporter/elasticsearch] Make OTel mapping mode send to data streams only (open-telemetry#35839)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Make OTel mapping mode use RequireDataStream in docappender, which means it will only send to data streams. This prevents auto creating regular indices in OTel mapping mode due to a race condition in Elasticsearch where otel-data index templates are not ready. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent eb1b495 commit 32f17c0

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Make OTel mapping mode send to data streams only
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35839]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This prevents auto creating regular indices in OTel mapping mode due to a race condition in Elasticsearch where otel-data index templates are not ready.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/bulkindexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
7373
MaxDocumentRetries: maxDocRetries,
7474
Pipeline: config.Pipeline,
7575
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
76+
RequireDataStream: config.MappingMode() == MappingOTel,
7677
}
7778
}
7879

exporter/elasticsearchexporter/bulkindexer_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,61 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
115115
}
116116
}
117117

118+
func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
119+
tests := []struct {
120+
name string
121+
config Config
122+
wantRequireDataStream bool
123+
}{
124+
{
125+
name: "ecs",
126+
config: Config{
127+
NumWorkers: 1,
128+
Mapping: MappingsSettings{Mode: MappingECS.String()},
129+
},
130+
wantRequireDataStream: false,
131+
},
132+
{
133+
name: "otel",
134+
config: Config{
135+
NumWorkers: 1,
136+
Mapping: MappingsSettings{Mode: MappingOTel.String()},
137+
},
138+
wantRequireDataStream: true,
139+
},
140+
}
141+
142+
for _, tt := range tests {
143+
tt := tt
144+
t.Run(tt.name, func(t *testing.T) {
145+
t.Parallel()
146+
requireDataStreamCh := make(chan bool, 1)
147+
client, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
148+
RoundTripFunc: func(r *http.Request) (*http.Response, error) {
149+
if r.URL.Path == "/_bulk" {
150+
requireDataStreamCh <- r.URL.Query().Get("require_data_stream") == "true"
151+
}
152+
return &http.Response{
153+
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
154+
Body: io.NopCloser(strings.NewReader(successResp)),
155+
}, nil
156+
},
157+
}})
158+
require.NoError(t, err)
159+
160+
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
161+
require.NoError(t, err)
162+
session, err := bulkIndexer.StartSession(context.Background())
163+
require.NoError(t, err)
164+
165+
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
166+
assert.NoError(t, bulkIndexer.Close(context.Background()))
167+
168+
assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
169+
})
170+
}
171+
}
172+
118173
func TestAsyncBulkIndexer_flush_error(t *testing.T) {
119174
tests := []struct {
120175
name string

0 commit comments

Comments
 (0)