From 476987f4e7be3975558ea8b2c4cbfa436dd359a9 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:00:47 -0500 Subject: [PATCH] Preserve original meta fields during early encoding of events (#42559) (#42696) Preserve the `Meta` field when generating the early-encoding of an event, and include it in string representations when logging event errors/warnings. Fixes https://github.com/elastic/beats/issues/41725. (cherry picked from commit d2daa5e99b53ec5bd70903f485fec59c8487f1a1) Co-authored-by: Fae Charlton --- CHANGELOG.next.asciidoc | 15 +++++++++++++++ .../outputs/elasticsearch/event_encoder.go | 14 ++++++++++++-- .../elasticsearch/event_encoder_test.go | 19 +++++++++++++------ 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a5f566d6aa96..13365340a922 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -78,6 +78,21 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support Elastic Agent control protocol chunking support {pull}37343[37343] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] +- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] +- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] +- Add `translate_ldap_attribute` processor. {pull}41472[41472] +- Remove unnecessary debug logs during idle connection teardown {issue}40824[40824] +- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794] +- Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636] +- Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857] +- Allow network condition to handle field values that are arrays of IP addresses. {pull}41918[41918] +- Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895] +- Fix setting unique registry for non beat receivers {issue}42288[42288] {pull}42292[42292] +- The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401] +- Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748] +- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439] +- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439] +- Restored event Meta fields in the Elasticsearch output's error logs. {pull}42559[42559] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go index e569ebf3abf3..2f32abe143a0 100644 --- a/libbeat/outputs/elasticsearch/event_encoder.go +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -55,6 +55,10 @@ type encodedEvent struct { // there's an ingestion error. timestamp time.Time + // The meta fields from the original event (which aren't included in the + // encoding but may still need to be logged if there is an error). + meta mapstr.M + id string opType events.OpType pipeline string @@ -130,6 +134,7 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { copy(bytes, bufBytes) return &encodedEvent{ id: id, + meta: e.Meta, timestamp: e.Timestamp, opType: opType, pipeline: pipeline, @@ -152,9 +157,14 @@ func (e *encodedEvent) setDeadLetter( e.encoding = []byte(deadLetterReencoding.String()) } -// String converts e.encoding to string and returns it. +// String converts e.encoding (and meta fields if present) +// to string and returns it. // The goal of this method is to provide an easy way to log // the event encoded. func (e *encodedEvent) String() string { - return string(e.encoding) + metaString := "none" + if e.meta != nil { + metaString = e.meta.String() + } + return string(e.encoding) + ", Meta: " + metaString } diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go index a3aef08ca23c..0a9fa5561e77 100644 --- a/libbeat/outputs/elasticsearch/event_encoder_test.go +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -42,6 +42,12 @@ func TestEncodeEntry(t *testing.T) { encoder := newEventEncoder(true, indexSelector, nil) + metaFields := mapstr.M{ + events.FieldMetaOpType: "create", + events.FieldMetaPipeline: "TEST_PIPELINE", + events.FieldMetaID: "test_id", + } + timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) pubEvent := publisher.Event{ Content: beat.Event{ @@ -53,11 +59,7 @@ func TestEncodeEntry(t *testing.T) { "nested_field": "nested_value", }, }, - Meta: mapstr.M{ - events.FieldMetaOpType: "create", - events.FieldMetaPipeline: "TEST_PIPELINE", - events.FieldMetaID: "test_id", - }, + Meta: metaFields, }, } @@ -81,6 +83,7 @@ func TestEncodeEntry(t *testing.T) { assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event") assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata") assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set") + assert.Equal(t, encBeatEvent.meta, metaFields, "encoded event struct should include original event's meta fields") // Check encoded fields var eventContent struct { @@ -97,6 +100,9 @@ func TestEncodeEntry(t *testing.T) { assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original") assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original") assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original") + + // Check string representation includes meta fields + assert.Contains(t, encBeatEvent.String(), `"pipeline":"TEST_PIPELINE"`, "String representation of encoded event should include the original event's meta fields") } // encodeBatch encodes a publisher.Batch so it can be provided to @@ -124,13 +130,14 @@ func encodeEvents(client *Client, events []publisher.Event) []publisher.Event { // Skip encoding if there's already encoded data present if events[i].EncodedEvent == nil { encoded, _ := encoder.EncodeEntry(events[i]) - event := encoded.(publisher.Event) + event, _ := encoded.(publisher.Event) events[i] = event } } return events } +//nolint:unused // False positive caused by varying build tags, this is used in client_test func encodeEvent(client *Client, event publisher.Event) publisher.Event { encoder := newEventEncoder( client.conn.EscapeHTML,