Skip to content

Commit 7a00c34

Browse files
justinianvoss22vincentfree
authored andcommitted
[receiver/netflowreceiver] add send_raw option for sending unparsed logs (open-telemetry#38832)
#### Description - Add a `send_raw` option to the config. - When set to true, the NetFlow log messages will be sent as an unparsed string in the log body. - When send_raw is false or is not configured, logs are parsed into attributes as before. - This works for all kinds of NetFlow log data, such as v5, v9, etc. #### Link to tracking issue Closes: open-telemetry#38920 #### Testing - New test in producer_test.go - Update test in config_test.go - Testing sending logs to Google SecOps: - Simulate NetFlow logs: https://hub.docker.com/r/networkstatic/nflow-generator - Set up configuration.yaml sending Netflow to Google SecOps - Confirm output of unparsed logs in Google SecOps #### Documentation - Update README.md with documentation of the `send_raw` field
1 parent 52bd5c0 commit 7a00c34

File tree

8 files changed

+154
-7
lines changed

8 files changed

+154
-7
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: netflowreceiver
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add `send_raw` option to send logs as a raw string in the log body instead of parsed into attributes.
9+
10+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
11+
issues: [38920]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:
17+
18+
# If your change doesn't affect end users or the exported elements of any package,
19+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

receiver/netflowreceiver/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ receivers:
3838
port: 6343
3939
sockets: 16
4040
workers: 32
41+
netflow/raw:
42+
- scheme: netflow
43+
port: 2055
44+
sockets: 16
45+
workers: 32
46+
send_raw: true
4147

4248
processors:
4349
batch:
@@ -73,6 +79,12 @@ You would then configure your network devices to send netflow, sflow, or ipfix d
7379
| sockets | The number of sockets to use | 1 | 1 |
7480
| workers | The number of workers used to decode incoming flow messages | 2 | 2 |
7581
| queue_size | The size of the incoming netflow packets queue, it will always be at least 1000. | 5000 | 1000 |
82+
| send_raw | Whether to send raw flow messages instead of parsing them | `true`, `false` | `false` |
83+
84+
When `send_raw` is set to `true`, the receiver will:
85+
86+
- Skip parsing the netflow/sflow messages
87+
- Send the raw message as the log body
7688

7789
## Data format
7890

receiver/netflowreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ type Config struct {
3030
// The size of the queue that the listener will use
3131
// This is a buffer that will hold flow messages before they are processed by a worker
3232
QueueSize int `mapstructure:"queue_size"`
33+
34+
// SendRaw determines whether to send raw flow messages instead of parsing them
35+
SendRaw bool `mapstructure:"send_raw"`
3336
}
3437

3538
// Validate checks if the receiver configuration is valid

receiver/netflowreceiver/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ func TestLoadConfig(t *testing.T) {
6060
QueueSize: 1000,
6161
},
6262
},
63+
{
64+
id: component.NewIDWithName(metadata.Type, "raw_logs"),
65+
expected: &Config{
66+
Scheme: "netflow",
67+
Port: 2055,
68+
Sockets: 1,
69+
Workers: 1,
70+
QueueSize: 1000,
71+
SendRaw: true,
72+
},
73+
},
6374
}
6475

6576
for _, tt := range tests {

receiver/netflowreceiver/producer.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-colle
55

66
import (
77
"context"
8+
"fmt"
89

910
"github.com/netsampler/goflow2/v2/producer"
1011
"go.opentelemetry.io/collector/consumer"
@@ -19,6 +20,7 @@ type OtelLogsProducerWrapper struct {
1920
wrapped producer.ProducerInterface
2021
logConsumer consumer.Logs
2122
logger *zap.Logger
23+
sendRaw bool
2224
}
2325

2426
// Produce converts the message into a list log records and sends them to log consumer
@@ -47,9 +49,14 @@ func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) (
4749
// A single netflow packet can contain multiple flow messages
4850
for _, msg := range flowMessageSet {
4951
logRecord := logRecords.AppendEmpty()
50-
parseErr := addMessageAttributes(msg, &logRecord)
51-
if parseErr != nil {
52-
continue
52+
if o.sendRaw {
53+
logRecord.Body().SetStr(fmt.Sprintf("%+v", msg))
54+
} else {
55+
// Parse the message and add the attributes to the log record
56+
err = addMessageAttributes(msg, &logRecord)
57+
if err != nil {
58+
o.logger.Error("error adding message attributes", zap.Error(err))
59+
}
5360
}
5461
}
5562

@@ -73,10 +80,11 @@ func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessa
7380
o.wrapped.Commit(flowMessageSet)
7481
}
7582

76-
func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs, logger *zap.Logger) producer.ProducerInterface {
83+
func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs, logger *zap.Logger, sendRaw bool) producer.ProducerInterface {
7784
return &OtelLogsProducerWrapper{
7885
wrapped: wrapped,
7986
logConsumer: logConsumer,
8087
logger: logger,
88+
sendRaw: sendRaw,
8189
}
8290
}

receiver/netflowreceiver/producer_test.go

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package netflowreceiver
55

66
import (
7+
"fmt"
78
"net/netip"
89
"testing"
910

@@ -56,7 +57,7 @@ func TestProduce(t *testing.T) {
5657
protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem)
5758
require.NoError(t, err)
5859

59-
otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop(), zap.NewNop())
60+
otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop(), zap.NewNop(), false)
6061
messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{})
6162
require.NoError(t, err)
6263
require.NotNil(t, messages)
@@ -70,6 +71,85 @@ func TestProduce(t *testing.T) {
7071
assert.Equal(t, uint32(838987416), pm.SequenceNum)
7172
}
7273

74+
func TestProduceRaw(t *testing.T) {
75+
// list of netflow.DataFlowSet
76+
message := &netflow.NFv9Packet{
77+
Version: 9,
78+
Count: 1,
79+
SystemUptime: 0xb3bff683,
80+
UnixSeconds: 0x618aa3a8,
81+
SequenceNumber: 838987416,
82+
SourceId: 256,
83+
FlowSets: []any{
84+
netflow.DataFlowSet{
85+
FlowSetHeader: netflow.FlowSetHeader{
86+
Id: 260,
87+
Length: 1372,
88+
},
89+
Records: []netflow.DataRecord{
90+
{
91+
Values: []netflow.DataField{
92+
{
93+
PenProvided: false,
94+
Type: 2,
95+
Pen: 0,
96+
Value: []uint8{0x00, 0x00, 0x00, 0x01},
97+
},
98+
},
99+
},
100+
{
101+
Values: []netflow.DataField{
102+
{
103+
PenProvided: false,
104+
Type: 2,
105+
Pen: 0,
106+
Value: []uint8{0x00, 0x00, 0x00, 0x02},
107+
},
108+
},
109+
},
110+
{
111+
Values: []netflow.DataField{
112+
{
113+
PenProvided: false,
114+
Type: 2,
115+
Pen: 0,
116+
Value: []uint8{0x00, 0x00, 0x00, 0x03},
117+
},
118+
},
119+
},
120+
},
121+
},
122+
},
123+
}
124+
125+
cfgProducer := &protoproducer.ProducerConfig{}
126+
cfgm, err := cfgProducer.Compile()
127+
require.NoError(t, err)
128+
129+
protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem)
130+
require.NoError(t, err)
131+
132+
sink := &consumertest.LogsSink{}
133+
otelLogsProducer := newOtelLogsProducer(protoProducer, sink, zap.NewNop(), true)
134+
135+
messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{})
136+
require.NoError(t, err)
137+
require.NotNil(t, messages)
138+
assert.Len(t, messages, 3)
139+
140+
logs := sink.AllLogs()
141+
require.Len(t, logs, 1)
142+
records := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
143+
require.Equal(t, 3, records.Len()) // Should have one record per flow record
144+
145+
// Each record should be a raw string representation of the ProducerMessage
146+
for i := 0; i < 3; i++ {
147+
record := records.At(i)
148+
msg := messages[i]
149+
assert.Equal(t, fmt.Sprintf("%+v", msg), record.Body().Str())
150+
}
151+
}
152+
73153
// This PanicProducer replaces the ProtoProducer, to simulate it producing a panic
74154
type PanicProducer struct{}
75155

@@ -90,7 +170,7 @@ func TestProducerPanic(t *testing.T) {
90170
mockConsumer := consumertest.NewNop()
91171

92172
// Wrap a PanicProducer (instead of ProtoProducer) in the OtelLogsProducerWrapper
93-
wrapper := newOtelLogsProducer(&PanicProducer{}, mockConsumer, logger)
173+
wrapper := newOtelLogsProducer(&PanicProducer{}, mockConsumer, logger, false)
94174

95175
// Call Produce which should recover from panic
96176
messages, err := wrapper.Produce(nil, &producer.ProduceArgs{

receiver/netflowreceiver/receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) {
108108

109109
// the otel log producer converts those messages into OpenTelemetry logs
110110
// it is a wrapper around the protobuf producer
111-
otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer, nr.logger)
111+
otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer, nr.logger, nr.config.SendRaw)
112112

113113
cfgPipe := &utils.PipeConfig{
114114
Producer: otelLogsProducer,

receiver/netflowreceiver/testdata/config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,11 @@ netflow/sflow:
4040
sockets: 1
4141
workers: 1
4242
queue_size: 0
43+
44+
netflow/raw_logs:
45+
scheme: netflow
46+
port: 2055
47+
sockets: 1
48+
workers: 1
49+
queue_size: 0
50+
send_raw: true

0 commit comments

Comments
 (0)