Skip to content

Commit 4bfdbfb

Browse files
authored
[exporter/elasticsearch] Feed the symbolization queues (#38577)
Start writing data into the symbolization queues, so that symbolization of frames can be done by the Elastic profiling symbolizer. Symbolization is the process that turns addresses into symbols (mostly function names). Without symbols, native frames mostly stay unreadable and thus unusable. There are two queues, one for leaf frames and one for executables.
1 parent e3a0a53 commit 4bfdbfb

File tree

7 files changed

+262
-13
lines changed

7 files changed

+262
-13
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Enable native frame symbolization for Universal Profiling via the symbolization queue indices.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38577]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/exporter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,9 @@ func (e *elasticsearchExporter) pushProfileRecord(
583583
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
584584
case otelserializer.ExecutablesIndex:
585585
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
586+
case otelserializer.ExecutablesSymQueueIndex, otelserializer.LeafFramesSymQueueIndex:
587+
// These regular indices have a low write-frequency and can share the executablesSession.
588+
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
586589
default:
587590
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
588591
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ const (
1818
StackTraceIndex = "profiling-stacktraces"
1919
StackFrameIndex = "profiling-stackframes"
2020
ExecutablesIndex = "profiling-executables"
21+
22+
ExecutablesSymQueueIndex = "profiling-sq-executables"
23+
LeafFramesSymQueueIndex = "profiling-sq-leafframes"
2124
)
2225

2326
// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
@@ -64,6 +67,18 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
6467
return err
6568
}
6669
}
70+
71+
for _, frame := range payload.UnsymbolizedLeafFrames {
72+
if err = pushDataAsJSON(frame, frame.DocID, LeafFramesSymQueueIndex); err != nil {
73+
return err
74+
}
75+
}
76+
77+
for _, executable := range payload.UnsymbolizedExecutables {
78+
if err = pushDataAsJSON(executable, executable.DocID, ExecutablesSymQueueIndex); err != nil {
79+
return err
80+
}
81+
}
6782
}
6883
return nil
6984
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,20 @@ func TestSerializeProfile(t *testing.T) {
8080
"scripted_upsert": true,
8181
"upsert": map[string]any{},
8282
},
83+
{
84+
"Stacktrace.frame.id": []any{"YA3K_koRAADyvzjEk_X7kgAAAAAAAABv"},
85+
"Symbolization.retries": json.Number("0"),
86+
"Symbolization.time.next": "",
87+
"Time.created": "",
88+
"ecs.version": serializeprofiles.EcsVersionString,
89+
},
90+
{
91+
"Executable.file.id": []any{"YA3K_koRAADyvzjEk_X7kg"},
92+
"Symbolization.retries": json.Number("0"),
93+
"Symbolization.time.next": "",
94+
"Time.created": "",
95+
"ecs.version": serializeprofiles.EcsVersionString,
96+
},
8397
{
8498
"@timestamp": "1970-01-01T00:00:00Z",
8599
"Stacktrace.count": json.Number("1"),
@@ -114,13 +128,26 @@ func TestSerializeProfile(t *testing.T) {
114128
var d map[string]any
115129
decoder := json.NewDecoder(v)
116130
decoder.UseNumber()
117-
err := decoder.Decode(&d)
131+
require.NoError(t, decoder.Decode(&d))
118132

119-
require.NoError(t, err)
133+
// Remove timestamps to allow comparing test results with expected values.
134+
for k, v := range d {
135+
switch k {
136+
case "Symbolization.time.next", "Time.created":
137+
tm, err := time.Parse(time.RFC3339Nano, v.(string))
138+
require.NoError(t, err)
139+
assert.True(t, isWithinLastSecond(tm))
140+
d[k] = ""
141+
}
142+
}
120143
results = append(results, d)
121144
}
122145

123146
assert.Equal(t, tt.expected, results)
124147
})
125148
}
126149
}
150+
151+
func isWithinLastSecond(t time.Time) bool {
152+
return time.Since(t) < time.Second
153+
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/model.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"
55

66
import (
7-
"go.opentelemetry.io/ebpf-profiler/libpf"
7+
"time"
88
)
99

1010
// EcsVersionString is the value for the `ecs.version` metrics field.
@@ -27,7 +27,8 @@ type StackPayload struct {
2727
StackFrames []StackFrame
2828
Executables []ExeMetadata
2929

30-
UnsymbolizedLeafFrames []libpf.FrameID
30+
UnsymbolizedLeafFrames []UnsymbolizedLeafFrame
31+
UnsymbolizedExecutables []UnsymbolizedExecutable
3132
}
3233

3334
// StackTraceEvent represents a stacktrace event serializable into ES.
@@ -130,3 +131,25 @@ func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) Exe
130131
},
131132
}
132133
}
134+
135+
// UnsymbolizedExecutable represents an array of executable FileIDs written into the
136+
// executable symbolization queue index.
137+
type UnsymbolizedExecutable struct {
138+
EcsVersion
139+
DocID string `json:"-"`
140+
FileID []string `json:"Executable.file.id"`
141+
Created time.Time `json:"Time.created"`
142+
Next time.Time `json:"Symbolization.time.next"`
143+
Retries int `json:"Symbolization.retries"`
144+
}
145+
146+
// UnsymbolizedLeafFrame represents an array of frame IDs written into the
147+
// leaf frame symbolization queue index.
148+
type UnsymbolizedLeafFrame struct {
149+
EcsVersion
150+
DocID string `json:"-"`
151+
FrameID []string `json:"Stacktrace.frame.id"`
152+
Created time.Time `json:"Time.created"`
153+
Next time.Time `json:"Symbolization.time.next"`
154+
Retries int `json:"Symbolization.retries"`
155+
}

exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles/transform.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ func checkProfileType(profile pprofile.Profile) error {
7777
// stackPayloads creates a slice of StackPayloads from the given ResourceProfiles,
7878
// ScopeProfiles, and ProfileContainer.
7979
func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) {
80-
unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len())
80+
unsymbolizedLeafFramesSet := make(map[libpf.FrameID]struct{}, profile.Sample().Len())
81+
unsymbolizedExecutablesSet := make(map[libpf.FileID]struct{})
8182
stackPayload := make([]StackPayload, 0, profile.Sample().Len())
8283

8384
hostMetadata := newHostMetadata(resource, scope, profile)
@@ -108,7 +109,23 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
108109
})
109110

110111
if !isFrameSymbolized(frames[len(frames)-1]) && leafFrame != nil {
111-
unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame)
112+
unsymbolizedLeafFramesSet[*leafFrame] = struct{}{}
113+
}
114+
115+
for j := range frames {
116+
if frameTypes[j].IsError() {
117+
// Artificial error frames can't be symbolized.
118+
continue
119+
}
120+
if isFrameSymbolized(frames[j]) {
121+
// Skip interpreted frames and already symbolized native frames (kernel, Golang is planned).
122+
continue
123+
}
124+
frameID, err := libpf.NewFrameIDFromString(frames[j].DocID)
125+
if err != nil {
126+
return nil, fmt.Errorf("stackPayloads: %w", err)
127+
}
128+
unsymbolizedExecutablesSet[frameID.FileID()] = struct{}{}
112129
}
113130

114131
// Add one event per timestamp and its count value.
@@ -138,12 +155,45 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
138155

139156
stackPayload[0].Executables = exeMetadata
140157
}
141-
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames
158+
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames(unsymbolizedLeafFramesSet)
159+
stackPayload[0].UnsymbolizedExecutables = unsymbolizedExecutables(unsymbolizedExecutablesSet)
142160
}
143161

144162
return stackPayload, nil
145163
}
146164

165+
func unsymbolizedExecutables(executables map[libpf.FileID]struct{}) []UnsymbolizedExecutable {
166+
now := time.Now()
167+
unsymbolized := make([]UnsymbolizedExecutable, 0, len(executables))
168+
for fileID := range executables {
169+
unsymbolized = append(unsymbolized, UnsymbolizedExecutable{
170+
EcsVersion: EcsVersion{V: EcsVersionString},
171+
DocID: fileID.Base64(),
172+
FileID: []string{fileID.Base64()},
173+
Created: now,
174+
Next: now,
175+
Retries: 0,
176+
})
177+
}
178+
return unsymbolized
179+
}
180+
181+
func unsymbolizedLeafFrames(frameIDs map[libpf.FrameID]struct{}) []UnsymbolizedLeafFrame {
182+
now := time.Now()
183+
unsymbolized := make([]UnsymbolizedLeafFrame, 0, len(frameIDs))
184+
for frameID := range frameIDs {
185+
unsymbolized = append(unsymbolized, UnsymbolizedLeafFrame{
186+
EcsVersion: EcsVersion{V: EcsVersionString},
187+
DocID: frameID.String(),
188+
FrameID: []string{frameID.String()},
189+
Created: now,
190+
Next: now,
191+
Retries: 0,
192+
})
193+
}
194+
return unsymbolized
195+
}
196+
147197
// symbolizedFrames returns a slice of StackFrames that have symbols.
148198
func symbolizedFrames(frames []StackFrame) []StackFrame {
149199
framesWithSymbols := make([]StackFrame, 0, len(frames))

0 commit comments

Comments
 (0)