Skip to content

[receiver/filelog] Flush can send partial input #32170

Closed
@OverOrion

Description

@OverOrion

Component(s)

pkg/stanza

What happened?

Description

As mentioned in #31512 the filelogreceiver could "loose" some characters. The issue is a bit long now, so this is here to summarize the problem, minimize the reproduction steps and point to the relevant lines of code.

The problem is how the flush logic behaves with a scanner. By default the scanner expects newline terminated lines, but if it can't find it, then it will return the current buffer once flush timeout expires. The problem with this is that there is no communication between the flushing and scanner, so the following is possible:

  1. scanner scanned some bytes, but has not reached EOF yet
  2. forced flush happens, then the scanner yields its' inner buffer

The problem here seems to be with the different lifetimes of FlushState and Scanner as there is a single FlushState instance for a reader which will have different Scanner instances

func New(r io.Reader, maxLogSize int, bufferSize int, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {

This means that these scanner instances will all all share the same fate:

  1. Read n bytes (n == initial buffer size), try to read more but since it can't find the newline terminator it won't return with tokens.
  2. Becase it did not run successfully, then a new one will be constructed:
    s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
    // Iterate over the tokenized file, emitting entries as we go
    for {
    select {
    case <-ctx.Done():
    return
    default:
    }
    ok := s.Scan()
    if !ok {
    if err := s.Error(); err != nil {
    r.logger.Errorw("Failed during scan", zap.Error(err))
    } else if r.deleteAtEOF {
    r.delete()
    }
    return
    }
  3. Once the flush timer expires, then the current Scanner will be force flushed, yielding n bytes only (this could be different, depenending on when the flush timeout reaps it)

The reconstruction is needed because:

  1. once a scanner reached the end of its' input then it won't be usable anymore and
  2. because this is how the Collector gets the new input from a file: a new Scanner with the offset

Steps to Reproduce

Input creation without newline ending

printf "2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpPQ78jzoFu" > input.log

Collector

  1. The easiest is to change the default scanner buffer size to something small (50 bytes, as the input is just a little over 100 bytes)
  2. Build and run the collector with the given configuration
  3. Check the output log file, the input is chunked into 50 bytes
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240671818953417","body":{"stringValue":"2024-03-19T11:21:00.839338492-05:00 stdout P 2024-"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240677018432716","body":{"stringValue":"03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpP"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240682018598612","body":{"stringValue":"Q78jzoFu"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}

Expected Result

Whole line as is

Actual Result

Chunked line

Possible solutions

  • A possible workaround would be if flushing only happened when atEOF was true, but it's only for file based sources, so it would not work for TCP for example.
  • A different "polling" method for Scanners so they would not have to be recreated just to read new input/lines, and the flush timeout would only send the the buffer if the scanning can't advance anymore
  • The bufio.Scanner might need to be retired in favor of something else (something based on bufio.Reader perhaps?), combined with keeping track of the current partial token. This is something that needs to be given some thought because many things rely on the scanner currently.

What do you think @djaglowski @ChrsMark?

Also huge kudos to @MrAnno for pair debugging this issue with me 🚀

Collector version

e4c5b51

Environment information

Environment

OS: Ubuntu 23.10
Compiler(if manually compiled): go 1.21.6

OpenTelemetry Collector configuration

receivers:
  filelog:
    start_at: beginning
    include:
    - /home/orion/input.log

exporters:
  file/simple:
    path: ./partial_output
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [debug, file/simple]

Log output

{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240671818953417","body":{"stringValue":"2024-03-19T11:21:00.839338492-05:00 stdout P 2024-"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240677018432716","body":{"stringValue":"03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpP"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1712240682018598612","body":{"stringValue":"Q78jzoFu"},"attributes":[{"key":"log.file.name","value":{"stringValue":"input.log"}}],"traceId":"","spanId":""}]}]}]}

Additional context

I also added some good ole' print statements to Func()

func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc {
and to s.Bytes()
which helped with debugging.

// First scanner instance
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024- // First 50 bytes
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpP // First 2*50 bytes
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpPQ78jzoFu // Leftovers
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpPQ78jzoFu //EOF

// Second scanner instance, same fate
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpP
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpPQ78jzoFu
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-03-13 11:51:00,838 [scheduler-2] INFO  dLphJ63kHpPQ78jzoFu

// Third scanner instance
inside Func, data is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-

// Flush timeout, sending current scanner's buffer, calling s.Bytes()
 tokenizing, len(bytes): 50,
tokenizing, bytes is: 2024-03-19T11:21:00.839338492-05:00 stdout P 2024-

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions