Skip to content

Commit 4ce25dd

Browse files
authored
[processor/coralogix] Add transactions feature to Coralogix processor (#40868)
#### Description Add APM transactions support to the Coralogix exporter. Reference: https://coralogix.com/docs/user-guides/apm/features/transactions/ #### Link to tracking issue Fixes #40863 #### Testing - Added some tests - Manually tested ![image](https://github.com/user-attachments/assets/663dfb01-f9a5-4b98-a432-78e39e75a814) ![image](https://github.com/user-attachments/assets/cb47e894-7025-4fab-8c40-4020c2551c95) ![image](https://github.com/user-attachments/assets/f3cf3169-840e-4930-9575-861f7eef65e3) ![image](https://github.com/user-attachments/assets/e759f541-a958-4c2f-b5c0-edfd1bf32dce) ![image](https://github.com/user-attachments/assets/d1096714-9a89-4c34-bf45-d38e6bd55cc7) ![image](https://github.com/user-attachments/assets/68d1fb76-0861-4e89-8690-ee1f53ca850f) [Trace exported from Coralogix backend](https://github.com/user-attachments/files/20975849/transactions.csv). `values.yaml` (note I changed the image to one built for this test): ```yaml opentelemetry-agent: service: enabled: true extraConfig: processors: groupbytrace: wait_duration: 30s coralogix: transactions: enabled: true service: pipelines: traces: processors: [groupbytrace, coralogix] ``` #### Documentation Added some documentation in the `README.md` file. Signed-off-by: Israel Blancas <[email protected]>
1 parent 49e4fe1 commit 4ce25dd

File tree

10 files changed

+543
-70
lines changed

10 files changed

+543
-70
lines changed

.chloggen/40863.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: coralogixprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add transactions feature
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40863]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The transactions feature enables tracking of distributed transactions across microservices in a distributed system.
20+
It provides end-to-end visibility into request flows by correlating spans across different services, allowing
21+
developers to understand the complete journey of a request through their microservices architecture. This
22+
feature is particularly useful for identifying performance bottlenecks, debugging issues, and monitoring
23+
the health of distributed applications.
24+
25+
More information:
26+
https://coralogix.com/docs/user-guides/apm/features/transactions
27+
28+
29+
# If your change doesn't affect end users or the exported elements of any package,
30+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
31+
# Optional: The change log or logs in which this entry should be included.
32+
# e.g. '[user]' or '[user, api]'
33+
# Include 'user' if the change is relevant to end users.
34+
# Include 'api' if there is a change to a library API.
35+
# Default: '[user]'
36+
change_logs: [user]

processor/coralogixprocessor/README.md

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,73 +17,57 @@
1717

1818
The Coralogix processor adds attributes to spans that enable features in Coralogix.
1919

20-
## Features
21-
22-
### DB Statement Blueprints
20+
## Configuration
2321

24-
This feature enables the processor to create blueprints from SQL queries, this means replacing any variables with `?`.
25-
The blueprint is also hashed to be able to be used with the spanmetrics connector.
26-
Long queries can be an issue when being stored in certain metric stores.
27-
Blueprints alleviate this problem by using the hash as the identifying dimension on the metric, which enables
28-
users to query metrics by blueprints.
22+
- `transactions`:
23+
- `enabled` (`false` by default): enables the transactions feature from the Coralogix processor (more information below).
2924

30-
The added attributes are `db.statement.blueprint` and `db.statement.blueprint.id`.
25+
## Features
3126

32-
* `db.statement.blueprint` contains the blueprinted version of the statement, we require them to be sent to Coralogix to
33-
display your blueprinted statement
34-
* `db.statement.blueprint.id` contains a hash of the statement, this way we can add it as a dimension in the spanmetrics
35-
connector and use it to query your blueprints.
36-
* `sampling.priority` if enabled contains the value 100 for new blueprints, further explanation below.
27+
### Transactions
3728

38-
#### Sampling
29+
The Transactions feature (originally called "Service Flows") is Coralogix's extension of OpenTelemetry instrumentation that breaks down each transaction into segments and aggregates their performance over time. It provides visibility into how each segment within a service contributes to overall transaction performance.
3930

40-
If sampling is enabled then it stores the found blueprints in an in-memory cache to be able to send only new blueprints
41-
that haven't been seen yet.
42-
This only adds an attribute to the span named `sampling.priority`, if the blueprint is new then the sampling priority
43-
will be `100`.
31+
More information in the [official docs](https://coralogix.com/docs/user-guides/apm/features/transactions).
4432

45-
Using this key it's possible to use either
46-
the [Tail Sampler](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor)
47-
or
48-
the [Probabilistic Sampler](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/probabilisticsamplerprocessor)
49-
to only send new blueprints to Coralogix.
50-
If sampling is not enabled it won't cache anything and the `sampling.priority` attribute won't be added.
33+
#### How It Works
5134

52-
The cache is limited by the `max_cache_size_mib` configuration, if the cache is full it will remove the oldest entries
53-
to make space for new ones.
54-
The cache stores hashes of the queries, each hash is 8 bytes, so the number of maximum cache entries is calculated
55-
by `max_cache_size_mib * 1024 * 1024 / 8`.
35+
The processor automatically identifies the root span within each trace and applies transaction attributes to all spans in that trace:
5636

57-
## Config
37+
1. **Root Span Identification**: The processor finds the span with no parent span ID (or whose parent is not in the current trace) and marks it as the root span.
5838

59-
* `db_statement_blueprints`
60-
* `sampling`:
61-
* `enabled`: (default: `false`) If enabled, adds the attribute `sampling.priority` with a value of `100` to spans with new
62-
blueprints.
63-
Refer to the [Sampling section](#sampling) for more information.
64-
* `max_cache_size_mib` (default: `1024`) The size of the cache in mebibytes to store seen blueprints hashes.
39+
2. **Attribute Application**: All spans in the trace receive the following attributes:
40+
- `cgx.transaction`: Set to the name of the root span
41+
- `cgx.transaction.root`: Set to `true` for the root span only
6542

66-
### Basic Setup
43+
3. **Trace State Updates**: For client spans (not server or consumer spans), the processor also updates the trace state with `cgx_transaction=<root-span-name>` to propagate transaction information across service boundaries.
6744

68-
This setup is without sampling meaning no `sampling.priority` attribute will be added to spans.
69-
The cache will be disabled.
45+
#### Configuration
7046

7147
```yaml
7248
processors:
7349
coralogix:
74-
db_statement_blueprints:
50+
transactions:
51+
enabled: true
7552
```
7653
77-
### With Sampling Config
54+
**Note**: The transactions feature requires the `groupbytrace` processor to be configured before the `coralogix` processor in your pipeline to work properly. This ensures that all spans from the same trace are processed together.
7855

79-
This setup will enable the cache to store seen blueprints and add the `sampling.priority` attribute to spans with new
80-
blueprints.
56+
```yaml
57+
processors:
58+
groupbytrace:
59+
wait_duration: 5s
60+
num_traces: 1000
61+
coralogix:
62+
transactions:
63+
enabled: true
64+
```
65+
66+
### Basic Setup
8167

8268
```yaml
8369
processors:
8470
coralogix:
85-
db_statement_blueprints:
86-
sampling:
87-
enabled: true
88-
max_cache_size_mib: 1024 #1GiB
89-
```
71+
transactions:
72+
enabled: true
73+
```

processor/coralogixprocessor/config.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,12 @@
33

44
package coralogixprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor"
55

6-
import "errors"
7-
8-
type samplingConfig struct {
9-
enabled bool `mapstructure:"enabled"`
10-
maxCacheSizeMib int64 `mapstructure:"max_cache_size_mib"`
11-
}
12-
13-
type databaseBlueprintsConfig struct {
14-
sampling samplingConfig `mapstructure:"sampling"`
6+
// TransactionsConfig holds configuration for transactions.
7+
type TransactionsConfig struct {
8+
Enabled bool `mapstructure:"enabled"`
9+
_ struct{} // prevents unkeyed literal initialization
1510
}
1611

1712
type Config struct {
18-
databaseBlueprintsConfig `mapstructure:"database_blueprints_config"`
19-
}
20-
21-
func (c *Config) Validate() error {
22-
if c.sampling.enabled && c.sampling.maxCacheSizeMib <= 0 {
23-
return errors.New("max_cache_size_mib must be a positive integer")
24-
}
25-
if c.sampling.enabled && c.sampling.maxCacheSizeMib != 0 {
26-
return errors.New("max_cache_size_mib can only be defined when sampling is enabled")
27-
}
28-
return nil
13+
TransactionsConfig `mapstructure:"transactions"`
2914
}

processor/coralogixprocessor/factory.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ func NewFactory() processor.Factory {
2222
}
2323

2424
func createDefaultConfig() component.Config {
25-
return &Config{}
25+
return &Config{
26+
TransactionsConfig: TransactionsConfig{
27+
Enabled: false,
28+
},
29+
}
2630
}
2731

2832
func createTracesProcessor(

processor/coralogixprocessor/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
go.opentelemetry.io/collector/processor/processorhelper v0.129.1-0.20250630174123-18b3b578b0b3
1515
go.opentelemetry.io/collector/processor/processortest v0.129.1-0.20250630174123-18b3b578b0b3
1616
go.uber.org/goleak v1.3.0
17+
go.uber.org/zap v1.27.0
1718
)
1819

1920
require (
@@ -52,7 +53,6 @@ require (
5253
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
5354
go.opentelemetry.io/otel/trace v1.37.0 // indirect
5455
go.uber.org/multierr v1.11.0 // indirect
55-
go.uber.org/zap v1.27.0 // indirect
5656
go.yaml.in/yaml/v3 v3.0.3 // indirect
5757
golang.org/x/net v0.39.0 // indirect
5858
golang.org/x/sys v0.33.0 // indirect
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package transactions // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/coralogixprocessor/internal/transactions"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pcommon"
8+
"go.opentelemetry.io/collector/pdata/ptrace"
9+
"go.uber.org/zap"
10+
)
11+
12+
type spanNode struct {
13+
span ptrace.Span
14+
children []*spanNode
15+
}
16+
17+
// buildSpanTree constructs a hierarchical tree of spans
18+
func buildSpanTree(spans []ptrace.Span, logger *zap.Logger) *spanNode {
19+
spanMap := make(map[pcommon.SpanID]*spanNode)
20+
var rootSpan *spanNode
21+
var orphanedSpans []*spanNode
22+
23+
for _, span := range spans {
24+
node := &spanNode{span: span}
25+
spanMap[span.SpanID()] = node
26+
27+
if span.ParentSpanID().IsEmpty() {
28+
if rootSpan != nil {
29+
logger.Warn("Multiple root spans found in single trace",
30+
zap.String("existingRootSpanID", rootSpan.span.SpanID().String()),
31+
zap.String("newRootSpanID", span.SpanID().String()))
32+
// We'll keep the earliest span as root
33+
if span.StartTimestamp() < rootSpan.span.StartTimestamp() {
34+
orphanedSpans = append(orphanedSpans, rootSpan)
35+
rootSpan = node
36+
} else {
37+
orphanedSpans = append(orphanedSpans, node)
38+
}
39+
} else {
40+
rootSpan = node
41+
}
42+
}
43+
}
44+
45+
if len(orphanedSpans) > 0 {
46+
logger.Warn("orphaned spans found", zap.Int("orphanedSpans", len(orphanedSpans)))
47+
}
48+
49+
// If no root span was found, use the earliest span as root
50+
if rootSpan == nil && len(spans) > 0 {
51+
earliestSpan := spanMap[spans[0].SpanID()]
52+
earliestTime := spans[0].StartTimestamp()
53+
54+
for _, node := range spanMap {
55+
if node.span.StartTimestamp() < earliestTime {
56+
earliestTime = node.span.StartTimestamp()
57+
earliestSpan = node
58+
}
59+
}
60+
61+
rootSpan = earliestSpan
62+
logger.Warn("No root span found in trace, using earliest span as root",
63+
zap.String("selectedRootSpanID", rootSpan.span.SpanID().String()))
64+
}
65+
66+
for _, node := range spanMap {
67+
if node == rootSpan {
68+
continue
69+
}
70+
71+
parentID := node.span.ParentSpanID()
72+
if parent, exists := spanMap[parentID]; exists {
73+
parent.children = append(parent.children, node)
74+
}
75+
}
76+
77+
return rootSpan
78+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package transactions
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"go.opentelemetry.io/collector/pdata/pcommon"
12+
"go.opentelemetry.io/collector/pdata/ptrace"
13+
"go.uber.org/zap"
14+
)
15+
16+
func TestBuildSpanTreeSingleSpan(t *testing.T) {
17+
logger := zap.NewNop()
18+
span := createSpan("span1", pcommon.SpanID([8]byte{1}), pcommon.SpanID([8]byte{}), 100)
19+
spans := []ptrace.Span{span}
20+
21+
root := buildSpanTree(spans, logger)
22+
23+
assert.NotNil(t, root)
24+
assert.Equal(t, span.SpanID(), root.span.SpanID())
25+
assert.Empty(t, root.children)
26+
}
27+
28+
func TestBuildSpanTreeParentChild(t *testing.T) {
29+
logger := zap.NewNop()
30+
parentSpan := createSpan("parent", pcommon.SpanID([8]byte{1}), pcommon.SpanID([8]byte{}), 100)
31+
childSpan := createSpan("child", pcommon.SpanID([8]byte{2}), parentSpan.SpanID(), 200)
32+
spans := []ptrace.Span{childSpan, parentSpan}
33+
34+
root := buildSpanTree(spans, logger)
35+
36+
assert.NotNil(t, root)
37+
assert.Equal(t, parentSpan.SpanID(), root.span.SpanID())
38+
assert.Len(t, root.children, 1)
39+
assert.Equal(t, childSpan.SpanID(), root.children[0].span.SpanID())
40+
}
41+
42+
func TestBuildSpanTreeMultipleRoots(t *testing.T) {
43+
logger := zap.NewNop()
44+
root1 := createSpan("root1", pcommon.SpanID([8]byte{1}), pcommon.SpanID([8]byte{}), 100)
45+
root2 := createSpan("root2", pcommon.SpanID([8]byte{2}), pcommon.SpanID([8]byte{}), 50)
46+
spans := []ptrace.Span{root1, root2}
47+
48+
root := buildSpanTree(spans, logger)
49+
50+
assert.NotNil(t, root)
51+
assert.Equal(t, root2.SpanID(), root.span.SpanID())
52+
}
53+
54+
func TestBuildSpanTreeNoRoot(t *testing.T) {
55+
logger := zap.NewNop()
56+
span1 := createSpan("span1", pcommon.SpanID([8]byte{1}), pcommon.SpanID([8]byte{3}), 100)
57+
span2 := createSpan("span2", pcommon.SpanID([8]byte{2}), pcommon.SpanID([8]byte{3}), 50)
58+
spans := []ptrace.Span{span1, span2}
59+
60+
root := buildSpanTree(spans, logger)
61+
62+
assert.NotNil(t, root)
63+
assert.Equal(t, span2.SpanID(), root.span.SpanID())
64+
}
65+
66+
func TestBuildSpanTreeEmpty(t *testing.T) {
67+
logger := zap.NewNop()
68+
spans := []ptrace.Span{}
69+
70+
root := buildSpanTree(spans, logger)
71+
72+
assert.Nil(t, root)
73+
}
74+
75+
func createSpan(name string, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, startTime int64) ptrace.Span {
76+
span := ptrace.NewSpan()
77+
span.SetName(name)
78+
span.SetSpanID(spanID)
79+
span.SetParentSpanID(parentSpanID)
80+
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, startTime)))
81+
return span
82+
}

0 commit comments

Comments
 (0)