Skip to content

Commit 21c2e24

Browse files
committed
in_systemd: allow a parser to be specified as part of the systemd unit
Similar to how we use kubernetes annotations to determine a parser, this uses custom fields in systemd units to configure a parser per systemd unit. In the unit file this is configured as: ``` [Service] ... LogExtraFields=FLUENT_BIT_PARSER=logfmt ```
1 parent e039c56 commit 21c2e24

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

plugins/in_systemd/systemd.c

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <fluent-bit/flb_input_plugin.h>
2222
#include <fluent-bit/flb_config.h>
2323
#include <fluent-bit/flb_time.h>
24+
#include <fluent-bit/flb_parser.h>
25+
#include <fluent-bit/flb_log_event_decoder.h>
2426

2527
#include "systemd_config.h"
2628
#include "systemd_db.h"
@@ -70,6 +72,59 @@ static int tag_compose(const char *tag, const char *unit_name,
7072
return 0;
7173
}
7274

75+
static int flb_systemd_repack_map(struct flb_log_event_encoder *encoder,
76+
char *data,
77+
size_t data_size)
78+
{
79+
msgpack_unpacked source_map;
80+
size_t offset;
81+
int result;
82+
size_t index;
83+
msgpack_object value;
84+
msgpack_object key;
85+
86+
result = FLB_EVENT_ENCODER_SUCCESS;
87+
88+
if (data_size > 0) {
89+
msgpack_unpacked_init(&source_map);
90+
91+
offset = 0;
92+
result = msgpack_unpack_next(&source_map,
93+
data,
94+
data_size,
95+
&offset);
96+
97+
if (result == MSGPACK_UNPACK_SUCCESS) {
98+
result = FLB_EVENT_ENCODER_SUCCESS;
99+
}
100+
else {
101+
result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
102+
}
103+
104+
for (index = 0;
105+
index < source_map.data.via.map.size &&
106+
result == FLB_EVENT_ENCODER_SUCCESS;
107+
index++) {
108+
key = source_map.data.via.map.ptr[index].key;
109+
value = source_map.data.via.map.ptr[index].val;
110+
111+
result = flb_log_event_encoder_append_body_msgpack_object(
112+
encoder,
113+
&key);
114+
115+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
116+
result = flb_log_event_encoder_append_body_msgpack_object(
117+
encoder,
118+
&value);
119+
}
120+
}
121+
122+
msgpack_unpacked_destroy(&source_map);
123+
}
124+
125+
return result;
126+
}
127+
73128
static int in_systemd_collect(struct flb_input_instance *ins,
74129
struct flb_config *config, void *in_context)
75130
{
@@ -84,11 +139,14 @@ static int in_systemd_collect(struct flb_input_instance *ins,
84139
long nsec;
85140
uint64_t usec;
86141
size_t length;
142+
size_t plength;
87143
size_t threshold;
144+
char *name;
88145
const char *sep;
89146
const char *key;
90147
const char *val;
91148
char *buf = NULL;
149+
void *pbuf = NULL;
92150
#ifdef FLB_HAVE_SQLDB
93151
char *cursor = NULL;
94152
#endif
@@ -100,6 +158,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
100158
const void *data;
101159
struct flb_systemd_config *ctx = in_context;
102160
struct flb_time tm;
161+
struct flb_parser *parser;
103162

104163
/* Restricted by mem_buf_limit */
105164
if (flb_input_buf_paused(ins) == FLB_TRUE) {
@@ -154,6 +213,17 @@ static int in_systemd_collect(struct flb_input_instance *ins,
154213
tag_len = ctx->ins->tag_len;
155214
}
156215

216+
/* Find the parser, if specified */
217+
ret = sd_journal_get_data(ctx->j, "FLUENT_BIT_PARSER", &data, &length);
218+
if (ret == 0) {
219+
name = flb_strndup((const char *)(data+18), length);
220+
parser = flb_parser_get(name, config);
221+
if (!parser) {
222+
flb_plg_error(ctx->ins, "no such parser: '%s'", sep+1);
223+
}
224+
free(name);
225+
}
226+
157227
if (last_tag_len == 0) {
158228
strncpy(last_tag, tag, tag_len);
159229
last_tag_len = tag_len;
@@ -219,6 +289,26 @@ static int in_systemd_collect(struct flb_input_instance *ins,
219289

220290
len = (sep - key);
221291

292+
if (strncmp(key, "FLUENT_BIT_PARSER", len) == 0) {
293+
continue;
294+
}
295+
296+
/* If this is the message, apply the parser if any is specified */
297+
if (parser && strncmp(key, "MESSAGE", len) == 0) {
298+
val = sep + 1;
299+
len = length - (sep - key) - 1;
300+
ret = flb_parser_do(parser, val, len, &pbuf, &plength, &tm);
301+
if (ret != -1) {
302+
ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength);
303+
continue;
304+
}
305+
/*
306+
* If the parser failed, reset the return code
307+
* to append the unparsed message as normal
308+
*/
309+
ret = FLB_EVENT_ENCODER_SUCCESS;
310+
}
311+
222312
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
223313
ret = flb_log_event_encoder_append_body_string_length(
224314
ctx->log_encoder, len);

0 commit comments

Comments
 (0)