Skip to content

[exporter/elasticsearch] Feed the symbolization queues #38577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 19, 2025
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ func (e *elasticsearchExporter) pushProfileRecord(
return eventsSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
case otelserializer.ExecutablesIndex:
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionUpdate)
case otelserializer.ExecutablesSymQueueIndex, otelserializer.LeafFramesSymQueueIndex:
// These regular indices have a low write-frequency and can share the executablesSession.
return executablesSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
default:
return defaultSession.Add(ctx, index, docID, "", buf, nil, docappender.ActionCreate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const (
StackTraceIndex = "profiling-stacktraces"
StackFrameIndex = "profiling-stackframes"
ExecutablesIndex = "profiling-executables"

ExecutablesSymQueueIndex = "profiling-sq-executables"
LeafFramesSymQueueIndex = "profiling-sq-leafframes"
)

// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
Expand Down Expand Up @@ -64,6 +67,18 @@ func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationSc
return err
}
}

for _, frame := range payload.UnsymbolizedLeafFrames {
if err = pushDataAsJSON(frame, frame.DocID, LeafFramesSymQueueIndex); err != nil {
return err
}
}

for _, executable := range payload.UnsymbolizedExecutables {
if err = pushDataAsJSON(executable, executable.DocID, ExecutablesSymQueueIndex); err != nil {
return err
}
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ func TestSerializeProfile(t *testing.T) {
"scripted_upsert": true,
"upsert": map[string]any{},
},
{
"Stacktrace.frame.id": []any{"YA3K_koRAADyvzjEk_X7kgAAAAAAAABv"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": "",
"Time.created": "",
"ecs.version": serializeprofiles.EcsVersionString,
},
{
"Executable.file.id": []any{"YA3K_koRAADyvzjEk_X7kg"},
"Symbolization.retries": json.Number("0"),
"Symbolization.time.next": "",
"Time.created": "",
"ecs.version": serializeprofiles.EcsVersionString,
},
{
"@timestamp": "1970-01-01T00:00:00Z",
"Stacktrace.count": json.Number("1"),
Expand Down Expand Up @@ -114,13 +128,26 @@ func TestSerializeProfile(t *testing.T) {
var d map[string]any
decoder := json.NewDecoder(v)
decoder.UseNumber()
err := decoder.Decode(&d)
require.NoError(t, decoder.Decode(&d))

require.NoError(t, err)
// Remove timestamps to allow comparing test results with expected values.
for k, v := range d {
switch k {
case "Symbolization.time.next", "Time.created":
tm, err := time.Parse(time.RFC3339Nano, v.(string))
require.NoError(t, err)
assert.True(t, isWithinLastSecond(tm))
d[k] = ""
}
}
results = append(results, d)
}

assert.Equal(t, tt.expected, results)
})
}
}

func isWithinLastSecond(t time.Time) bool {
return time.Since(t) < time.Second
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"

import (
"go.opentelemetry.io/ebpf-profiler/libpf"
"time"
)

// EcsVersionString is the value for the `ecs.version` metrics field.
Expand All @@ -27,7 +27,8 @@ type StackPayload struct {
StackFrames []StackFrame
Executables []ExeMetadata

UnsymbolizedLeafFrames []libpf.FrameID
UnsymbolizedLeafFrames []UnsymbolizedLeafFrame
UnsymbolizedExecutables []UnsymbolizedExecutable
}

// StackTraceEvent represents a stacktrace event serializable into ES.
Expand Down Expand Up @@ -130,3 +131,25 @@ func NewExeMetadata(docID string, lastSeen uint32, buildID, fileName string) Exe
},
}
}

// UnsymbolizedExecutable represents an array of executable FileIDs written into the
// executable symbolization queue index.
type UnsymbolizedExecutable struct {
EcsVersion
DocID string `json:"-"`
FileID []string `json:"Executable.file.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}

// UnsymbolizedLeafFrame represents an array of frame IDs written into the
// leaf frame symbolization queue index.
type UnsymbolizedLeafFrame struct {
EcsVersion
DocID string `json:"-"`
FrameID []string `json:"Stacktrace.frame.id"`
Created time.Time `json:"Time.created"`
Next time.Time `json:"Symbolization.time.next"`
Retries int `json:"Symbolization.retries"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func checkProfileType(profile pprofile.Profile) error {
// stackPayloads creates a slice of StackPayloads from the given ResourceProfiles,
// ScopeProfiles, and ProfileContainer.
func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile) ([]StackPayload, error) {
unsymbolizedLeafFrames := make([]libpf.FrameID, 0, profile.Sample().Len())
unsymbolizedLeafFramesSet := make(map[libpf.FrameID]struct{}, profile.Sample().Len())
unsymbolizedExecutablesSet := make(map[libpf.FileID]struct{})
stackPayload := make([]StackPayload, 0, profile.Sample().Len())

hostMetadata := newHostMetadata(resource, scope, profile)
Expand Down Expand Up @@ -108,7 +109,23 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope
})

if !isFrameSymbolized(frames[len(frames)-1]) && leafFrame != nil {
unsymbolizedLeafFrames = append(unsymbolizedLeafFrames, *leafFrame)
unsymbolizedLeafFramesSet[*leafFrame] = struct{}{}
}

for j := range frames {
if frameTypes[j].IsError() {
// Artificial error frames can't be symbolized.
continue
}
if isFrameSymbolized(frames[j]) {
// Skip interpreted frames and already symbolized native frames (kernel, Golang is planned).
continue
}
frameID, err := libpf.NewFrameIDFromString(frames[j].DocID)
if err != nil {
return nil, fmt.Errorf("stackPayloads: %w", err)
}
unsymbolizedExecutablesSet[frameID.FileID()] = struct{}{}
}

// Add one event per timestamp and its count value.
Expand Down Expand Up @@ -138,12 +155,45 @@ func stackPayloads(resource pcommon.Resource, scope pcommon.InstrumentationScope

stackPayload[0].Executables = exeMetadata
}
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames
stackPayload[0].UnsymbolizedLeafFrames = unsymbolizedLeafFrames(unsymbolizedLeafFramesSet)
stackPayload[0].UnsymbolizedExecutables = unsymbolizedExecutables(unsymbolizedExecutablesSet)
}

return stackPayload, nil
}

func unsymbolizedExecutables(executables map[libpf.FileID]struct{}) []UnsymbolizedExecutable {
now := time.Now()
unsymbolized := make([]UnsymbolizedExecutable, 0, len(executables))
for fileID := range executables {
unsymbolized = append(unsymbolized, UnsymbolizedExecutable{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: fileID.Base64(),
FileID: []string{fileID.Base64()},
Created: now,
Next: now,
Retries: 0,
})
}
return unsymbolized
}

func unsymbolizedLeafFrames(frameIDs map[libpf.FrameID]struct{}) []UnsymbolizedLeafFrame {
now := time.Now()
unsymbolized := make([]UnsymbolizedLeafFrame, 0, len(frameIDs))
for frameID := range frameIDs {
unsymbolized = append(unsymbolized, UnsymbolizedLeafFrame{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: frameID.String(),
FrameID: []string{frameID.String()},
Created: now,
Next: now,
Retries: 0,
})
}
return unsymbolized
}

// symbolizedFrames returns a slice of StackFrames that have symbols.
func symbolizedFrames(frames []StackFrame) []StackFrame {
framesWithSymbols := make([]StackFrame, 0, len(frames))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package serializeprofiles
import (
"bytes"
"errors"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -213,8 +215,24 @@ func TestTransform(t *testing.T) {
"libc.so",
),
},
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
UnsymbolizedLeafFrames: []UnsymbolizedLeafFrame{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: frameIDBase64,
FrameID: []string{frameIDBase64},
},
},
UnsymbolizedExecutables: []UnsymbolizedExecutable{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildIDBase64,
FileID: []string{buildIDBase64},
},
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildID2Base64,
FileID: []string{buildID2Base64},
},
},
},
{
Expand All @@ -234,6 +252,9 @@ func TestTransform(t *testing.T) {
sp := rp.ScopeProfiles().At(0)

payload, err := Transform(rp.Resource(), sp.Scope(), sp.Profiles().At(0))
require.NoError(t, checkAndResetTimes(payload))
sortPayloads(payload)
sortPayloads(tt.wantPayload)
require.Equal(t, tt.wantErr, err)
assert.Equal(t, tt.wantPayload, payload)
})
Expand Down Expand Up @@ -325,8 +346,24 @@ func TestStackPayloads(t *testing.T) {
"libc.so",
),
},
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
UnsymbolizedLeafFrames: []UnsymbolizedLeafFrame{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: frameIDBase64,
FrameID: []string{frameIDBase64},
},
},
UnsymbolizedExecutables: []UnsymbolizedExecutable{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildIDBase64,
FileID: []string{buildIDBase64},
},
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildID2Base64,
FileID: []string{buildID2Base64},
},
},
},
{
Expand Down Expand Up @@ -410,8 +447,24 @@ func TestStackPayloads(t *testing.T) {
"libc.so",
),
},
UnsymbolizedLeafFrames: []libpf.FrameID{
libpf.NewFrameID(buildID, address),
UnsymbolizedLeafFrames: []UnsymbolizedLeafFrame{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: frameIDBase64,
FrameID: []string{frameIDBase64},
},
},
UnsymbolizedExecutables: []UnsymbolizedExecutable{
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildIDBase64,
FileID: []string{buildIDBase64},
},
{
EcsVersion: EcsVersion{V: EcsVersionString},
DocID: buildID2Base64,
FileID: []string{buildID2Base64},
},
},
},
{
Expand All @@ -430,6 +483,9 @@ func TestStackPayloads(t *testing.T) {
sp := rp.ScopeProfiles().At(0)

payloads, err := stackPayloads(rp.Resource(), sp.Scope(), sp.Profiles().At(0))
require.NoError(t, checkAndResetTimes(payloads))
sortPayloads(payloads)
sortPayloads(tt.wantPayload)
require.Equal(t, tt.wantErr, err)
assert.Equal(t, tt.wantPayload, payloads)
})
Expand Down Expand Up @@ -696,3 +752,51 @@ func mkStackTraceID(t *testing.T, frameIDs []libpf.FrameID) string {

return traceID
}

// sortPayloads brings the payloads into a deterministic form to allow comparisons.
func sortPayloads(payloads []StackPayload) {
for idx := range payloads {
payload := &payloads[idx]
sort.Slice(payload.UnsymbolizedExecutables, func(i, j int) bool {
return payload.UnsymbolizedExecutables[i].DocID < payload.UnsymbolizedExecutables[j].DocID
})
}
}

func checkAndResetTimes(payloads []StackPayload) error {
var errs []error
for i := range payloads {
payload := &payloads[i]
for j := range payload.UnsymbolizedLeafFrames {
frame := &payload.UnsymbolizedLeafFrames[j]
if !isWithinLastSecond(frame.Created) {
errs = append(errs, fmt.Errorf("payload[%d].UnsymbolizedLeafFrames[%d].Created is too old: %v",
i, j, frame.Created))
}
if !isWithinLastSecond(frame.Next) {
errs = append(errs, fmt.Errorf("payload[%d].UnsymbolizedLeafFrames[%d].Next is too old: %v",
i, j, frame.Next))
}
frame.Created = time.Time{}
frame.Next = time.Time{}
}
for j := range payload.UnsymbolizedExecutables {
executable := &payload.UnsymbolizedExecutables[j]
if !isWithinLastSecond(executable.Created) {
errs = append(errs, fmt.Errorf("payload[%d].UnsymbolizedExecutables[%d].Created is too old: %v",
i, j, executable.Created))
}
if !isWithinLastSecond(executable.Next) {
errs = append(errs, fmt.Errorf("payload[%d].UnsymbolizedExecutables[%d].Next is too old: %v",
i, j, executable.Next))
}
executable.Created = time.Time{}
executable.Next = time.Time{}
}
}
return errors.Join(errs...)
}

func isWithinLastSecond(t time.Time) bool {
return time.Since(t) < time.Second
}
Loading