Skip to content

filter_parser: add record accessor support in parser filter #10366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions plugins/filter_parser/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ static int configure(struct filter_parser_ctx *ctx,
struct flb_kv *kv;

ctx->key_name = NULL;
ctx->ra_key = NULL;
ctx->reserve_data = FLB_FALSE;
ctx->preserve_key = FLB_FALSE;
mk_list_init(&ctx->parsers);
Expand All @@ -118,6 +119,15 @@ static int configure(struct filter_parser_ctx *ctx,
}
ctx->key_name_len = flb_sds_len(ctx->key_name);

if (ctx->key_name && ctx->key_name[0] == '$') {
ctx->ra_key = flb_ra_create(ctx->key_name, FLB_TRUE);
if (!ctx->ra_key) {
flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'",
ctx->key_name);
return -1;
}
}

/* Read all Parsers */
mk_list_foreach(head, &f_ins->properties) {
kv = mk_list_entry(head, struct flb_kv, _head);
Expand Down Expand Up @@ -246,20 +256,11 @@ static int cb_parser_filter(const void *data, size_t bytes,
}
}

/* Process the target key */
for (i = 0; i < map_num; i++) {
kv = &obj->via.map.ptr[i];
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
continue;
}
if (ctx->ra_key) {
struct flb_ra_value *rval;

if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
continue;
}

/* Lookup parser */
rval = flb_ra_get_value_object(ctx->ra_key, *obj);
if (rval && msgpackobj2char(&rval->o, &val_str, &val_len) == 0) {
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);
flb_time_zero(&parsed_time);
Expand All @@ -271,17 +272,53 @@ static int cb_parser_filter(const void *data, size_t bytes,
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}
break;
}
}
}

if (append_arr != NULL) {
if (!ctx->preserve_key) {
append_arr[i] = NULL;
if (rval) {
flb_ra_key_value_destroy(rval);
}
}
else {
/* Process the target key */
for (i = 0; i < map_num; i++) {
kv = &obj->via.map.ptr[i];
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
continue;
}

if (key_len == ctx->key_name_len &&
!strncmp(key_str, ctx->key_name, key_len)) {
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
continue;
}

/* Lookup parser */
mk_list_foreach(head, &ctx->parsers) {
fp = mk_list_entry(head, struct filter_parser, _head);
flb_time_zero(&parsed_time);

parse_ret = flb_parser_do(fp->parser, val_str, val_len,
(void **) &out_buf, &out_size,
&parsed_time);
if (parse_ret >= 0) {
if (flb_time_to_nanosec(&parsed_time) != 0L) {
flb_time_copy(&tm, &parsed_time);
}
else if (!ctx->reserve_data) {
/* Store only the key being preserved */
append_arr[0] = kv;

if (append_arr != NULL) {
if (!ctx->preserve_key) {
append_arr[i] = NULL;
}
else if (!ctx->reserve_data) {
/* Store only the key being preserved */
append_arr[0] = kv;
}
}
break;
}
break;
}
}
}
Expand Down Expand Up @@ -413,6 +450,9 @@ static int cb_parser_exit(void *data, struct flb_config *config)
}

delete_parsers(ctx);
if (ctx->ra_key) {
flb_ra_destroy(ctx->ra_key);
}
flb_free(ctx);
return 0;
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/filter_parser/filter_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_record_accessor.h>
#include <fluent-bit/flb_ra_key.h>

struct filter_parser {
struct flb_parser *parser;
Expand All @@ -33,6 +35,7 @@ struct filter_parser {
struct filter_parser_ctx {
flb_sds_t key_name;
int key_name_len;
struct flb_record_accessor *ra_key;
int reserve_data;
int preserve_key;
struct mk_list parsers;
Expand Down
88 changes: 88 additions & 0 deletions tests/runtime/filter_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,93 @@ void flb_test_filter_parser_extract_fields()
flb_destroy(ctx);
}

void flb_test_filter_parser_record_accessor()
{
int ret;
int bytes;
char *p, *output, *expected;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;
int filter_ffd;
struct flb_parser *parser;

struct flb_lib_out_cb cb;
cb.cb = callback_test;
cb.data = NULL;

clear_output();

ctx = flb_create();

/* Configure service */
flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace" "1", "Log_Level", "debug", NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd,
"Tag", "test",
NULL);

/* Parser */
parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
FLB_TRUE,
NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE, NULL, 0,
NULL, ctx->config);
TEST_CHECK(parser != NULL);

/* Filter */
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
TEST_CHECK(filter_ffd >= 0);
ret = flb_filter_set(ctx, filter_ffd,
"Match", "test",
"Key_Name", "$log['data']",
"Parser", "dummy_test",
"Reserve_Data", "On",
"Preserve_Key", "Off",
NULL);
TEST_CHECK(ret == 0);

/* Output */
out_ffd = flb_output(ctx, (char *) "lib", &cb);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,
"Match", "*",
"format", "json",
NULL);

/* Start the engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Ingest data */
p = "[1448403340,{\"log\":{\"data\":\"100 0.5 true This is an example\"},\"extra\":\"Some more data\"}]";
bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
TEST_CHECK(bytes == strlen(p));

wait_with_timeout(2000, &output); /* waiting flush and ensuring data flush */
TEST_CHECK_(output != NULL, "Expected output to not be NULL");
if (output != NULL) {
/* check timestamp */
expected = "[1448403340.000000,{";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
/* check fields were extracted */
expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
/* check original nested key */
expected = "\"log\":{\"data\":\"100 0.5 true This is an example\"}";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
/* check extra data preserved */
expected = "\"extra\":\"Some more data\"";
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to preserve extra field, got '%s'", output);
free(output);
}

flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_filter_parser_reserve_data_off()
{
int ret;
Expand Down Expand Up @@ -1301,6 +1388,7 @@ void flb_test_filter_parser_reserve_on_preserve_on()

TEST_LIST = {
{"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
{"filter_parser_record_accessor", flb_test_filter_parser_record_accessor },
{"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
{"filter_parser_handle_time_key", flb_test_filter_parser_handle_time_key },
{"filter_parser_handle_time_key_with_time_zone", flb_test_filter_parser_handle_time_key_with_time_zone },
Expand Down
Loading