diff --git a/plugins/in_kmsg/in_kmsg.c b/plugins/in_kmsg/in_kmsg.c index 15f10545..481d162b 100644 --- a/plugins/in_kmsg/in_kmsg.c +++ b/plugins/in_kmsg/in_kmsg.c @@ -104,6 +104,32 @@ static int boot_time(struct timeval *boot_time) return 0; } +static void load_last_sequence(struct flb_in_kmsg_config *ctx) +{ + FILE *f = fopen(FLB_KMSG_SEQ_MARKER, "r"); + if (!f) { + ctx->last_sequence = 0; + return; + } + + if (fscanf(f, "%llu", &ctx->last_sequence) != 1) { + ctx->last_sequence = 0; + } + + fclose(f); +} + +static void save_last_sequence(struct flb_in_kmsg_config *ctx) +{ + FILE *f = fopen(FLB_KMSG_SEQ_MARKER, "w"); + if (!f) { + return; + } + + fprintf(f, "%llu\n", (unsigned long long)ctx->last_sequence); + fclose(f); +} + static inline int process_line(const char *line, struct flb_input_instance *i_ins, struct flb_in_kmsg_config *ctx) @@ -161,6 +187,11 @@ static inline int process_line(const char *line, } sequence = val; + /* skip lines we have already seen */ + if (sequence <= ctx->last_sequence) { + return 0; /* skip old message */ + } + p = ++end; /* Timestamp */ @@ -225,6 +256,7 @@ static inline int process_line(const char *line, ctx->log_encoder.output_buffer, ctx->log_encoder.output_length); + ctx->last_sequence = sequence; ret = 0; } else { @@ -363,6 +395,10 @@ static int in_kmsg_init(struct flb_input_instance *ins, return -1; } + load_last_sequence(ctx); + flb_plg_debug(ctx->ins, "restored last_sequence=%llu", + (unsigned long long)ctx->last_sequence); + return 0; } @@ -373,6 +409,12 @@ static int in_kmsg_exit(void *data, struct flb_config *config) flb_log_event_encoder_destroy(&ctx->log_encoder); + /* + * save the last read sequence number in a file, + * so that we know where to start next time + */ + save_last_sequence(ctx); + if (ctx->fd >= 0) { close(ctx->fd); } diff --git a/plugins/in_kmsg/in_kmsg.h b/plugins/in_kmsg/in_kmsg.h index 0ea2e97a..4b9d98fd 100644 --- a/plugins/in_kmsg/in_kmsg.h +++ b/plugins/in_kmsg/in_kmsg.h @@ -27,6 +27,7 @@ #include #define FLB_KMSG_DEV "/dev/kmsg" +#define FLB_KMSG_SEQ_MARKER "/tmp/fluent_bit_kmsg_last_seq" #define FLB_KMSG_BUF_SIZE 4096 /* Alert levels, taken from util-linux sources */ @@ -60,6 +61,9 @@ struct flb_in_kmsg_config { size_t buf_size; struct flb_log_event_encoder log_encoder; struct flb_input_instance *ins; + + uint64_t last_sequence; /* last saved sequence */ + const char *seq_file; /* path to seq file */ };