Skip to content

Commit

Permalink
Preserve original meta fields during early encoding of events (#42559) (
Browse files Browse the repository at this point in the history
#42696)

Preserve the `Meta` field when generating the early-encoding of an event, and include it in string representations when logging event errors/warnings.

Fixes #41725.

(cherry picked from commit d2daa5e)

Co-authored-by: Fae Charlton <[email protected]>
  • Loading branch information
mergify[bot] and faec authored Feb 13, 2025
1 parent a79254b commit 476987f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356]
- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794]
- Add `translate_ldap_attribute` processor. {pull}41472[41472]
- Remove unnecessary debug logs during idle connection teardown {issue}40824[40824]
- Remove unnecessary reload for Elastic Agent managed beats when apm tracing config changes from nil to nil {pull}41794[41794]
- Fix incorrect cloud provider identification in add_cloud_metadata processor using provider priority mechanism {pull}41636[41636]
- Prevent panic if libbeat processors are loaded more than once. {issue}41475[41475] {pull}41857[51857]
- Allow network condition to handle field values that are arrays of IP addresses. {pull}41918[41918]
- Fix a bug where log files are rotated on startup when interval is configured and rotateonstartup is disabled {issue}41894[41894] {pull}41895[41895]
- Fix setting unique registry for non beat receivers {issue}42288[42288] {pull}42292[42292]
- The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401]
- Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748]
- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439]
- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439]
- Restored event Meta fields in the Elasticsearch output's error logs. {pull}42559[42559]

*Auditbeat*

Expand Down
14 changes: 12 additions & 2 deletions libbeat/outputs/elasticsearch/event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type encodedEvent struct {
// there's an ingestion error.
timestamp time.Time

// The meta fields from the original event (which aren't included in the
// encoding but may still need to be logged if there is an error).
meta mapstr.M

id string
opType events.OpType
pipeline string
Expand Down Expand Up @@ -130,6 +134,7 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent {
copy(bytes, bufBytes)
return &encodedEvent{
id: id,
meta: e.Meta,
timestamp: e.Timestamp,
opType: opType,
pipeline: pipeline,
Expand All @@ -152,9 +157,14 @@ func (e *encodedEvent) setDeadLetter(
e.encoding = []byte(deadLetterReencoding.String())
}

// String converts e.encoding to string and returns it.
// String converts e.encoding (and meta fields if present)
// to string and returns it.
// The goal of this method is to provide an easy way to log
// the event encoded.
func (e *encodedEvent) String() string {
return string(e.encoding)
metaString := "none"
if e.meta != nil {
metaString = e.meta.String()
}
return string(e.encoding) + ", Meta: " + metaString
}
19 changes: 13 additions & 6 deletions libbeat/outputs/elasticsearch/event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func TestEncodeEntry(t *testing.T) {

encoder := newEventEncoder(true, indexSelector, nil)

metaFields := mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
}

timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
pubEvent := publisher.Event{
Content: beat.Event{
Expand All @@ -53,11 +59,7 @@ func TestEncodeEntry(t *testing.T) {
"nested_field": "nested_value",
},
},
Meta: mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
},
Meta: metaFields,
},
}

Expand All @@ -81,6 +83,7 @@ func TestEncodeEntry(t *testing.T) {
assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event")
assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata")
assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set")
assert.Equal(t, encBeatEvent.meta, metaFields, "encoded event struct should include original event's meta fields")

// Check encoded fields
var eventContent struct {
Expand All @@ -97,6 +100,9 @@ func TestEncodeEntry(t *testing.T) {
assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original")
assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original")
assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original")

// Check string representation includes meta fields
assert.Contains(t, encBeatEvent.String(), `"pipeline":"TEST_PIPELINE"`, "String representation of encoded event should include the original event's meta fields")
}

// encodeBatch encodes a publisher.Batch so it can be provided to
Expand Down Expand Up @@ -124,13 +130,14 @@ func encodeEvents(client *Client, events []publisher.Event) []publisher.Event {
// Skip encoding if there's already encoded data present
if events[i].EncodedEvent == nil {
encoded, _ := encoder.EncodeEntry(events[i])
event := encoded.(publisher.Event)
event, _ := encoded.(publisher.Event)
events[i] = event
}
}
return events
}

//nolint:unused // False positive caused by varying build tags, this is used in client_test
func encodeEvent(client *Client, event publisher.Event) publisher.Event {
encoder := newEventEncoder(
client.conn.EscapeHTML,
Expand Down

0 comments on commit 476987f

Please sign in to comment.