Skip to content

Commit

Permalink
Log raw events and errors containing events to a separate file
Browse files Browse the repository at this point in the history
This commit introduces a new logger that can be configured through
`logging.events` that can be used to log any message that contains the
whole event or could contain the whole event.

At the moment it is used by the elasticsearch output to log indexing
errors containing the whole event and errors returned by Elasticsearch
that can potentially contain the whole event.
  • Loading branch information
belimawr committed Dec 20, 2023
1 parent a633696 commit db6216e
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 42 deletions.
43 changes: 36 additions & 7 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type beatConfig struct {
BufferConfig *config.C `config:"http.buffer"`
Path paths.Path `config:"path"`
Logging *config.C `config:"logging"`
EventLogging *config.C `config:"logging.events"`
MetricLogging *config.C `config:"logging.metrics"`
Keystore *config.C `config:"keystore"`
Instrumentation instrumentation.Config `config:"instrumentation"`
Expand Down Expand Up @@ -378,7 +379,26 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)

// Get the default/current logging configuration
// we need some defaults to be populates otherwise Unpack will
// fail
eventsLoggerCfg := logp.DefaultConfig(configure.GetEnvironment())

// merge eventsLoggerCfg with b.Config.Logging, so logging.events.* only
// overwrites logging.*
if err := b.Config.EventLogging.Unpack(&eventsLoggerCfg); err != nil {
return nil, fmt.Errorf("error initialising events logger: %w", err)
}

// Ensure the default filename is set
if eventsLoggerCfg.Files.Name == "" {
eventsLoggerCfg.Files.Name = b.Info.Beat
// Append the name so the files do not overwrite themselves.
eventsLoggerCfg.Files.Name = eventsLoggerCfg.Files.Name + "-events-data"
}

outputFactory := b.makeOutputFactory(b.Config.Output, eventsLoggerCfg)
settings := pipeline.Settings{
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
Expand All @@ -388,7 +408,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader(), eventsLoggerCfg))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
Expand Down Expand Up @@ -784,6 +804,14 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if b.Config.EventLogging == nil {
b.Config.EventLogging = config.NewConfig()
}
b.Config.EventLogging.Merge(b.Config.Logging)
if _, err := b.Config.EventLogging.Remove("events", -1); err != nil {
return fmt.Errorf("cannot merge logging and logging.events configuration: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}
Expand Down Expand Up @@ -1091,7 +1119,7 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
}
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader, eventsLoggerCfg logp.Config) reload.Reloadable {
return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error {
if update == nil {
return nil
Expand All @@ -1113,15 +1141,16 @@ func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Re
}
}

return outReloader.Reload(update, b.createOutput)
return outReloader.Reload(update, eventsLoggerCfg, b.createOutput)
})
}

func (b *Beat) makeOutputFactory(
cfg config.Namespace,
eventLoggerCfg logp.Config,
) func(outputs.Observer) (string, outputs.Group, error) {
return func(outStats outputs.Observer) (string, outputs.Group, error) {
out, err := b.createOutput(outStats, cfg)
out, err := b.createOutput(outStats, cfg, eventLoggerCfg)
return cfg.Name(), out, err
}
}
Expand Down Expand Up @@ -1217,7 +1246,7 @@ func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error {
return nil
}

func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) {
func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace, eventsLoggerCfg logp.Config) (outputs.Group, error) {
if !cfg.IsSet() {
return outputs.Group{}, nil
}
Expand All @@ -1226,7 +1255,7 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outpu
return outputs.Group{}, fmt.Errorf("could not setup output certificates reloader: %w", err)
}

return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config())
return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config(), eventsLoggerCfg)
}

func (b *Beat) registerClusterUUIDFetching() {
Expand Down
4 changes: 3 additions & 1 deletion libbeat/cmd/test/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/testing"
)

Expand All @@ -41,7 +42,8 @@ func GenTestOutputCmd(settings instance.Settings) *cobra.Command {
}

im, _ := idxmgmt.DefaultSupport(nil, b.Info, nil)
output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config())
// we use an empty config for the events logger because this is just a output test
output, err := outputs.Load(im, b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config(), logp.Config{})
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func makeConsole(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
Expand Down
22 changes: 14 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Client struct {
observer outputs.Observer
NonIndexableAction string

log *logp.Logger
log *logp.Logger
eventsLogger *logp.Logger
}

// ClientSettings contains the settings for a client.
Expand All @@ -81,6 +82,8 @@ const (

// NewClient instantiates a new client.
func NewClient(
logger *logp.Logger,
eventsLogger *logp.Logger,
s ClientSettings,
onConnect *callbacksRegistry,
) (*Client, error) {
Expand Down Expand Up @@ -140,7 +143,8 @@ func NewClient(
observer: s.Observer,
NonIndexableAction: s.NonIndexableAction,

log: logp.NewLogger("elasticsearch"),
log: logger,
eventsLogger: eventsLogger,
}

return client, nil
Expand Down Expand Up @@ -174,6 +178,8 @@ func (client *Client) Clone() *Client {
client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil

c, _ := NewClient(
client.log,
client.eventsLogger,
ClientSettings{
ConnectionSettings: connection,
Index: client.index,
Expand Down Expand Up @@ -427,12 +433,12 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status)
client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Errorf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.NonIndexableAction == dead_letter_index {
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = mapstr.M{
dead_letter_marker_field: true,
Expand All @@ -447,8 +453,8 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
}
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
client.log.Warnf("Cannot index event (status=%v): dropping event! Look for events-data log file to view the event and cause.", status)
client.eventsLogger.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg)
continue
}
}
Expand Down
51 changes: 30 additions & 21 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"go.uber.org/zap"
)

func init() {
Expand All @@ -38,8 +39,13 @@ func makeES(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
log := logp.NewLogger(logSelector)
eventsLogger := logp.NewLogger(logSelector)
// Set a new Output so it writes to a different file than `log`
eventsLogger = log.WithOptions(zap.WrapCore(logp.WithFileOutput(eventsLoggerCfg)))

if !cfg.HasField("bulk_max_size") {
if err := cfg.SetInt("bulk_max_size", -1, defaultBulkSize); err != nil {
return outputs.Fail(err)
Expand Down Expand Up @@ -110,27 +116,30 @@ func makeES(
}

var client outputs.NetworkClient
client, err = NewClient(ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Observer: observer,
NonIndexableAction: policy.action(),
}, &connectCallbackRegistry)
client, err = NewClient(
log,
eventsLogger,
ClientSettings{
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Observer: observer,
NonIndexableAction: policy.action(),
}, &connectCallbackRegistry)
if err != nil {
return outputs.Fail(err)
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func makeFileout(
beat beat.Info,
observer outputs.Observer,
cfg *c.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func makeKafka(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
log := logp.NewLogger(logSelector)
log.Debug("initialize kafka output")
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -40,6 +41,7 @@ func makeLogstash(
beat beat.Info,
observer outputs.Observer,
cfg *conf.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {
lsConfig, err := readConfig(cfg, beat)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

var outputReg = map[string]Factory{}
Expand All @@ -32,7 +33,8 @@ type Factory func(
im IndexManager,
beat beat.Info,
stats Observer,
cfg *config.C) (Group, error)
cfg *config.C,
eventsLogger logp.Config) (Group, error)

// IndexManager provides additional index related services to the outputs.
type IndexManager interface {
Expand Down Expand Up @@ -81,6 +83,7 @@ func Load(
stats Observer,
name string,
config *config.C,
eventsLoggerCfg logp.Config,
) (Group, error) {
factory := FindFactory(name)
if factory == nil {
Expand All @@ -90,5 +93,5 @@ func Load(
if stats == nil {
stats = NewNilObserver()
}
return factory(im, info, stats, config)
return factory(im, info, stats, config, eventsLoggerCfg)
}
2 changes: 2 additions & 0 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -51,6 +52,7 @@ func makeRedis(
beat beat.Info,
observer outputs.Observer,
cfg *config.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {

if !cfg.HasField("index") {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func makeShipper(
beat beat.Info,
observer outputs.Observer,
cfg *conf.C,
eventsLoggerCfg logp.Config,
) (outputs.Group, error) {

config := defaultConfig()
Expand Down
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func (c *outputController) Set(outGrp outputs.Group) {
// Reload the output
func (c *outputController) Reload(
cfg *reload.ConfigWithMeta,
outFactory func(outputs.Observer, conf.Namespace) (outputs.Group, error),
eventsLoggerCfg logp.Config,
outFactory func(outputs.Observer, conf.Namespace, logp.Config) (outputs.Group, error),
) error {
outCfg := conf.Namespace{}
if cfg != nil {
Expand All @@ -191,7 +192,7 @@ func (c *outputController) Reload(

output, err := loadOutput(c.monitors, func(stats outputs.Observer) (string, outputs.Group, error) {
name := outCfg.Name()
out, err := outFactory(stats, outCfg)
out, err := outFactory(stats, outCfg, eventsLoggerCfg)
return name, out, err
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ const (
type OutputReloader interface {
Reload(
cfg *reload.ConfigWithMeta,
factory func(outputs.Observer, conf.Namespace) (outputs.Group, error),
eventsLoggerCfg logp.Config,
factory func(outputs.Observer, conf.Namespace, logp.Config) (outputs.Group, error),
) error
}

Expand Down

0 comments on commit db6216e

Please sign in to comment.