Skip to content

[exporter/elasticsearch] Feed the symbolization queues #38577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 19, 2025
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-profiles-symbolization-queues.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable native frame symbolization for Universal Profiling via the symbolization queue indices.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38577]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ func (e *elasticsearchExporter) pushProfileRecord(
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.ExecutablesIndex:
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
case otelserializer.ExecutablesSymQueueIndex, otelserializer.LeafFramesSymQueueIndex:
// These regular indices have a low write-frequency and can share the executablesSession.
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
default:
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
StackTraceIndex = "profiling-stacktraces"
StackFrameIndex = "profiling-stackframes"
ExecutablesIndex = "profiling-executables"

ExecutablesSymQueueIndex = "profiling-sq-executables"
LeafFramesSymQueueIndex = "profiling-sq-leafframes"
)

// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
Expand Down Expand Up @@ -64,6 +67,18 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
return err
}
}

for _, frame := range payload.UnsymbolizedLeafFrames {
if err = pushDataAsJSON(frame, frame.DocID, LeafFramesSymQueueIndex); err != nil {
return err
}
}

for _, executable := range payload.UnsymbolizedExecutables {
if err = pushDataAsJSON(executable, executable.DocID, ExecutablesSymQueueIndex); err != nil {
return err
}
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ func TestSerializeProfile(t *testing.T) {
"scripted_upsert": true,
"upsert": map[string]any{},
},
{
"Stacktrace.frame.id": []any{"YA3K_koRAADyvzjEk_X7kgAAAAAAAABv"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": "",
"Time.created": "",
"ecs.version": serializeprofiles.EcsVersionString,
},
{
"Executable.file.id": []any{"YA3K_koRAADyvzjEk_X7kg"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": "",
"Time.created": "",
"ecs.version": serializeprofiles.EcsVersionString,
},
{
"@timestamp": "1970-01-01T00:00:00Z",
"Stacktrace.count": json.Number("1"),
Expand Down Expand Up @@ -114,13 +128,26 @@ func TestSerializeProfile(t *testing.T) {
var d map[string]any
decoder := json.NewDecoder(v)
decoder.UseNumber()
err := decoder.Decode(&d)
require.NoError(t, decoder.Decode(&d))

require.NoError(t, err)
// Remove timestamps to allow comparing test results with expected values.
for k, v := range d {
switch k {
case "Symbolization.time.next", "Time.created":
tm, err := time.Parse(time.RFC3339Nano, v.(string))
require.NoError(t, err)
assert.True(t, isWithinLastSecond(tm))
d[k] = ""
}
}
results = append(results, d)
}

assert.Equal(t, tt.expected, results)
})
}
}

func isWithinLastSecond(t time.Time) bool {
return time.Since(t) < time.Second
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"

import (
"go.opentelemetry.io/ebpf-profiler/libpf"
"time"
)

// EcsVersionString is the value for the `ecs.version` metrics field.
Expand All @@ -27,7 +27,8 @@ type StackPayload struct {
StackFrames []StackFrame
Executables []ExeMetadata

UnsymbolizedLeafFrames []libpf.FrameID
UnsymbolizedLeafFrames []UnsymbolizedLeafFrame
UnsymbolizedExecutables []UnsymbolizedExecutable
}

// StackTraceEvent represents a stacktrace event serializable into ES.
Expand Down Expand Up @@ -130,3 +131,25 @@ func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) Exe
},
}
}

// UnsymbolizedExecutable represents an array of executable FileIDs written into the
// executable symbolization queue index.
type UnsymbolizedExecutable struct {
EcsVersion
DocID string `json:"-"`
FileID []string `json:"Executable.file.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}

// UnsymbolizedLeafFrame represents an array of frame IDs written into the
// leaf frame symbolization queue index.
type UnsymbolizedLeafFrame struct {
EcsVersion
DocID string `json:"-"`
FrameID []string `json:"Stacktrace.frame.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func checkProfileType(profile pprofile.Profile) error {
// stackPayloads creates a slice of StackPayloads from the given ResourceProfiles,
// ScopeProfiles, and ProfileContainer.
func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) {
unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len())
unsymbolizedLeafFramesSet := make(map[libpf.FrameID]struct{}, profile.Sample().Len())
unsymbolizedExecutablesSet := make(map[libpf.FileID]struct{})
stackPayload := make([]StackPayload, 0, profile.Sample().Len())

hostMetadata := newHostMetadata(resource, scope, profile)
Expand Down Expand Up @@ -108,7 +109,23 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
})

if !isFrameSymbolized(frames[len(frames)-1]) && leafFrame != nil {
unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame)
unsymbolizedLeafFramesSet[*leafFrame] = struct{}{}
}

for j := range frames {
if frameTypes[j].IsError() {
// Artificial error frames can't be symbolized.
continue
}
if isFrameSymbolized(frames[j]) {
// Skip interpreted frames and already symbolized native frames (kernel, Golang is planned).
continue
}
frameID, err := libpf.NewFrameIDFromString(frames[j].DocID)
if err != nil {
return nil, fmt.Errorf("stackPayloads: %w", err)
}
unsymbolizedExecutablesSet[frameID.FileID()] = struct{}{}
}

// Add one event per timestamp and its count value.
Expand Down Expand Up @@ -138,12 +155,45 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope

stackPayload[0].Executables = exeMetadata
}
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames(unsymbolizedLeafFramesSet)
stackPayload[0].UnsymbolizedExecutables = unsymbolizedExecutables(unsymbolizedExecutablesSet)
}

return stackPayload, nil
}

func unsymbolizedExecutables(executables map[libpf.FileID]struct{}) []UnsymbolizedExecutable {
now := time.Now()
unsymbolized := make([]UnsymbolizedExecutable, 0, len(executables))
for fileID := range executables {
unsymbolized = append(unsymbolized, UnsymbolizedExecutable{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: fileID.Base64(),
FileID: []string{fileID.Base64()},
Created: now,
Next: now,
Retries: 0,
})
}
return unsymbolized
}

func unsymbolizedLeafFrames(frameIDs map[libpf.FrameID]struct{}) []UnsymbolizedLeafFrame {
now := time.Now()
unsymbolized := make([]UnsymbolizedLeafFrame, 0, len(frameIDs))
for frameID := range frameIDs {
unsymbolized = append(unsymbolized, UnsymbolizedLeafFrame{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: frameID.String(),
FrameID: []string{frameID.String()},
Created: now,
Next: now,
Retries: 0,
})
}
return unsymbolized
}

// symbolizedFrames returns a slice of StackFrames that have symbols.
func symbolizedFrames(frames []StackFrame) []StackFrame {
framesWithSymbols := make([]StackFrame, 0, len(frames))
Expand Down
Loading
Loading