Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routes_mask: added auto scalable route mask size feature #9248

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ struct flb_config {
int shutdown_by_hot_reloading;
int hot_reloading;

/* Routing */
size_t route_mask_size;
size_t route_mask_slots;
uint64_t *route_empty_mask;

/* Co-routines */
unsigned int coro_stack_size;

Expand Down
3 changes: 1 addition & 2 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ struct flb_input_chunk {
#ifdef FLB_HAVE_CHUNK_TRACE
struct flb_chunk_trace *trace;
#endif /* FLB_HAVE_CHUNK_TRACE */
uint64_t routes_mask
[FLB_ROUTES_MASK_ELEMENTS]; /* track the output plugins the chunk routes to */
flb_route_mask_element *routes_mask; /* track the output plugins the chunk routes to */
struct mk_list _head;
};

Expand Down
37 changes: 20 additions & 17 deletions include/fluent-bit/flb_routes_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,35 @@
*
* A value of 1 in the bitfield means that output plugin is selected
* and a value of zero means that output is deselected.
*
* The size of the bitmask array limits the number of output plugins
* The router can route to. For example: with a value of 4 using
* 64-bit integers the bitmask can represent up to 256 output plugins
*/
#define FLB_ROUTES_MASK_ELEMENTS 4

/*
* How many bits are in each element of the bitmask array
*/
#define FLB_ROUTES_MASK_ELEMENT_BITS (sizeof(uint64_t) * CHAR_BIT)
typedef uint64_t flb_route_mask_element;

/*
* The maximum number of routes that can be stored in the array
* How many bits are in each element of the bitmask array
*/
#define FLB_ROUTES_MASK_MAX_VALUE (FLB_ROUTES_MASK_ELEMENTS * FLB_ROUTES_MASK_ELEMENT_BITS)

#define FLB_ROUTES_MASK_ELEMENT_BITS (sizeof(flb_route_mask_element) * CHAR_BIT)

/* forward declaration */
struct flb_input_instance;
struct flb_config;

int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask,
const char *tag,
int tag_len,
struct flb_input_instance *in);
int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask,
struct flb_config *config);

int flb_routes_empty_mask_create(struct flb_config *config);
void flb_routes_empty_mask_destroy(struct flb_config *config);

int flb_routes_mask_set_by_tag(uint64_t *routes_mask, const char *tag, int tag_len, struct flb_input_instance *in);
int flb_routes_mask_get_bit(uint64_t *routes_mask, int value);
void flb_routes_mask_set_bit(uint64_t *routes_mask, int value);
void flb_routes_mask_clear_bit(uint64_t *routes_mask, int value);
int flb_routes_mask_is_empty(uint64_t *routes_mask);
int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config);

#endif
3 changes: 2 additions & 1 deletion plugins/in_storage_backlog/sb.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
mk_list_foreach_safe(head, tmp, &context->backlogs) {
backlog = mk_list_entry(head, struct sb_out_queue, _head);
if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask,
backlog->ins->id)) {
backlog->ins->id,
backlog->ins->config)) {
result = sb_append_chunk_to_segregated_backlog(target_chunk, stream,
chunk_size, backlog);
if (result) {
Expand Down
5 changes: 5 additions & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ struct flb_config *flb_config_init()
}
}

/* Routing */
flb_routes_mask_set_size(1, config);

config->cio = NULL;
config->storage_path = NULL;
config->storage_input_plugin = NULL;
Expand Down Expand Up @@ -547,6 +550,8 @@ void flb_config_exit(struct flb_config *config)
flb_cf_destroy(cf);
}

flb_routes_empty_mask_destroy(config);

flb_free(config);
}

Expand Down
7 changes: 7 additions & 0 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,13 @@ int flb_engine_start(struct flb_config *config)
return -1;
}

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config);

if (ret != 0) {
flb_error("[engine] routing mask dimensioning failed");
return -1;
}

/* Initialize custom plugins */
ret = flb_custom_init_all(config);
if (ret == -1) {
Expand Down
64 changes: 52 additions & 12 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ static int flb_input_chunk_release_space(
struct flb_input_chunk, _head);

if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask,
output_plugin->id)) {
output_plugin->id,
input_plugin->config)) {
continue;
}

Expand All @@ -182,13 +183,15 @@ static int flb_input_chunk_release_space(

if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) {
flb_routes_mask_clear_bit(old_input_chunk->routes_mask,
output_plugin->id);
output_plugin->id,
input_plugin->config);

FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size);
output_plugin->fs_chunks_size -= chunk_size;

chunk_destroy_flag = flb_routes_mask_is_empty(
old_input_chunk->routes_mask);
old_input_chunk->routes_mask,
input_plugin->config);

chunk_released = FLB_TRUE;
}
Expand Down Expand Up @@ -405,7 +408,9 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
* the routes_mask could be modified when new chunks is ingested. Therefore,
* we still need to do the validation on the routes_mask with o_id.
*/
if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) {
if (flb_routes_mask_get_bit(old_ic->routes_mask,
o_id,
ic->in->config) == 0) {
return FLB_FALSE;
}

Expand Down Expand Up @@ -508,7 +513,9 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
o_ins = mk_list_entry(head, struct flb_output_instance, _head);

if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 ||
(flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
(flb_routes_mask_get_bit(ic->routes_mask,
o_ins->id,
o_ins->config) == 0)) {
continue;
}

Expand Down Expand Up @@ -548,7 +555,9 @@ int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
o_ins = mk_list_entry(head, struct flb_output_instance, _head);

if ((o_ins->total_limit_size == -1) ||
(flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
(flb_routes_mask_get_bit(ic->routes_mask,
o_ins->id,
o_ins->config) == 0)) {
continue;
}

Expand Down Expand Up @@ -584,7 +593,8 @@ int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_siz
flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit);
}
}
return !flb_routes_mask_is_empty(ic->routes_mask);
return !flb_routes_mask_is_empty(ic->routes_mask,
i_ins->config);
}

/* Create an input chunk using a Chunk I/O */
Expand Down Expand Up @@ -904,6 +914,17 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
#ifdef FLB_HAVE_METRICS
ic->total_records = 0;
#endif
ic->routes_mask = (flb_route_mask_element *)
flb_calloc(in->config->route_mask_size,
sizeof(flb_route_mask_element));

if (ic->routes_mask == NULL) {
flb_errno();
cio_chunk_close(chunk, CIO_TRUE);
flb_free(ic);
return NULL;
}


/* Calculate the routes_mask for the input chunk */
has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in);
Expand Down Expand Up @@ -953,7 +974,9 @@ int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,
continue;
}

if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
if (flb_routes_mask_get_bit(ic->routes_mask,
o_ins->id,
o_ins->config) != 0) {
if (ic->fs_counted == FLB_TRUE) {
FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
o_ins->fs_chunks_size -= bytes;
Expand Down Expand Up @@ -993,6 +1016,12 @@ int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,

cio_chunk_close(ic->chunk, del);
mk_list_del(&ic->_head);

if (ic->routes_mask != NULL) {
flb_free(ic->routes_mask);
ic->routes_mask = NULL;
}

flb_free(ic);

return 0;
Expand Down Expand Up @@ -1025,7 +1054,9 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
continue;
}

if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
if (flb_routes_mask_get_bit(ic->routes_mask,
o_ins->id,
o_ins->config) != 0) {
if (ic->fs_counted == FLB_TRUE) {
FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes);
o_ins->fs_chunks_size -= bytes;
Expand Down Expand Up @@ -1086,6 +1117,12 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)

cio_chunk_close(ic->chunk, del);
mk_list_del(&ic->_head);

if (ic->routes_mask != NULL) {
flb_free(ic->routes_mask);
ic->routes_mask = NULL;
}

flb_free(ic);

return 0;
Expand Down Expand Up @@ -1169,15 +1206,16 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
* that the chunk will flush to, we need to modify the routes_mask of the oldest chunks
* (based in creation time) to get enough space for the incoming chunk.
*/
if (!flb_routes_mask_is_empty(ic->routes_mask)
if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config)
&& flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) {
/*
* If the chunk is not newly created, the chunk might already have logs inside.
* We cannot delete (reused) chunks here.
* If the routes_mask is cleared after trying to append new data, we destroy
* the chunk.
*/
if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
if (new_chunk ||
flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) == FLB_TRUE) {
flb_input_chunk_destroy(ic, FLB_TRUE);
}
return NULL;
Expand Down Expand Up @@ -2103,7 +2141,9 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
continue;
}

if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
if (flb_routes_mask_get_bit(ic->routes_mask,
o_ins->id,
o_ins->config) != 0) {
/*
* if there is match on any index of 1's in the binary, it indicates
* that the input chunk will flush to this output instance
Expand Down
Loading
Loading