Skip to content

Commit

Permalink
[Netflow] Migrate from log to the logp logging library
Browse files Browse the repository at this point in the history
The rest of beats is using github.com/elastic/elastic-agent-libs/logp, so make Netflow use it, too.
Cleaned up the test code as well
  • Loading branch information
jrmolin authored Feb 19, 2025
1 parent 2200a16 commit d05a070
Show file tree
Hide file tree
Showing 24 changed files with 193 additions and 164 deletions.
37 changes: 16 additions & 21 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package config

import (
"io"
"time"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/elastic-agent-libs/logp"
)

type ActiveSessionsMetric interface {
Expand All @@ -19,7 +19,7 @@ type ActiveSessionsMetric interface {
// Config stores the configuration used by the NetFlow Collector.
type Config struct {
protocols []string
logOutput io.Writer
logOutput *logp.Logger
expiration time.Duration
detectReset bool
fields fields.FieldDict
Expand All @@ -28,21 +28,22 @@ type Config struct {
activeSessionsMetric ActiveSessionsMetric
}

var defaultCfg = Config{
protocols: []string{},
logOutput: io.Discard,
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}

// Defaults returns a configuration object with defaults settings:
// - no protocols are enabled.
// - log output is discarded
// - log output is set to the logger that is passed in.
// - session expiration is checked once every hour.
func Defaults() Config {
return defaultCfg
// - resets are detected.
// - templates are not shared.
// - cache is disabled.
func Defaults(logger *logp.Logger) Config {
return Config{
protocols: []string{},
logOutput: logger,
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}
}

// WithProtocols modifies an existing configuration object to enable the
Expand All @@ -52,12 +53,6 @@ func (c *Config) WithProtocols(protos ...string) *Config {
return c
}

// WithLogOutput sets the output io.Writer for logging.
func (c *Config) WithLogOutput(output io.Writer) *Config {
c.logOutput = output
return c
}

// WithExpiration configures the expiration timeout for sessions and templates.
// A value of zero disables expiration.
func (c *Config) WithExpiration(timeout time.Duration) *Config {
Expand Down Expand Up @@ -121,7 +116,7 @@ func (c *Config) Protocols() []string {
}

// LogOutput returns the io.Writer where logs are to be written.
func (c *Config) LogOutput() io.Writer {
func (c *Config) LogOutput() *logp.Logger {
return c.logOutput
}

Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/netflow/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"net"
"sync"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
Expand Down Expand Up @@ -93,8 +95,8 @@ func (p *Decoder) Read(buf *bytes.Buffer, source net.Addr) (records []record.Rec
}

// NewConfig returns a new configuration structure to be passed to NewDecoder.
func NewConfig() *config.Config {
cfg := config.Defaults()
func NewConfig(logger *logp.Logger) *config.Config {
cfg := config.Defaults(logger)
return &cfg
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,52 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"os"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder"
)

func main() {
decoder, err := decoder.NewDecoder(decoder.NewConfig().
WithLogOutput(os.Stderr).
logger := logp.L().Named("netflow")

decoder, err := decoder.NewDecoder(decoder.NewConfig(logger).
WithProtocols("v1", "v5", "v9", "ipfix"))
if err != nil {
log.Fatal("Failed creating decoder:", err)
logger.Fatal("Failed creating decoder:", err)
}

addr, err := net.ResolveUDPAddr("udp", ":2055")
if err != nil {
log.Fatal("Failed to resolve address:", err)
logger.Fatal("Failed to resolve address:", err)
}

server, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("Failed to listen on %v: %v", addr, err)
logger.Fatalf("Failed to listen on %v: %v", addr, err)
}
defer server.Close()

if err = server.SetReadBuffer(1 << 16); err != nil {
log.Fatalf("Failed to set read buffer size for socket: %v", err)
logger.Fatalf("Failed to set read buffer size for socket: %v", err)
}

log.Println("Listening on ", server.LocalAddr())
logger.Debug("Listening on ", server.LocalAddr())
buf := make([]byte, 8192)
decBuf := new(bytes.Buffer)
for {
size, remote, err := server.ReadFromUDP(buf)
if err != nil {
log.Println("Error reading from socket:", err)
logger.Debug("Error reading from socket:", err)
continue
}

decBuf.Reset()
decBuf.Write(buf[:size])
records, err := decoder.Read(decBuf, remote)
if err != nil {
log.Printf("warn: Failed reading records from %v: %v\n", remote, err)
logger.Debugf("warn: Failed reading records from %v: %v\n", remote, err)
}

for _, r := range records {
Expand All @@ -63,7 +64,7 @@ func main() {
"data": r.Fields,
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
fmt.Println(string(evt))
}
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package ipfix

import (
"log"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
Expand All @@ -29,7 +27,7 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
logger := log.New(config.LogOutput(), LogPrefix, 0)
logger := config.LogOutput().Named(LogPrefix)
decoder := DecoderIPFIX{
DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()},
}
Expand Down
16 changes: 11 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
)

func init() {
logp.TestingSetup()
}

func TestMessageWithOptions(t *testing.T) {
rawString := "" +
"000a01e45bf435e1000000a500000000000200480400001000080004000c0004" +
Expand Down Expand Up @@ -67,7 +73,7 @@ func TestMessageWithOptions(t *testing.T) {
"version": uint64(10),
},
}
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(bytes.NewBuffer(raw), test.MakeAddress(t, "127.0.0.1:1234"))
assert.NoError(t, err)
if assert.Len(t, flows, 7) {
Expand All @@ -84,7 +90,7 @@ func TestOptionTemplates(t *testing.T) {
key := v9.MakeSessionKey(addr, 1234, false)

t.Run("Single options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(test.MakePacket([]uint16{
// Header
// Version, Length, Ts, SeqNo, Source
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("Multiple options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -151,7 +157,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("records discarded", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -193,7 +199,7 @@ func TestOptionTemplates(t *testing.T) {
func TestCustomFields(t *testing.T) {
addr := test.MakeAddress(t, "127.0.0.1:12345")

conf := config.Defaults()
conf := config.Defaults(logp.L())
conf.WithCustomFields(fields.FieldDict{
fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String},
})
Expand Down
12 changes: 9 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/protocol/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

func init() {
logp.TestingSetup()
}

type testProto int

func (testProto) Version() uint16 {
Expand Down Expand Up @@ -61,7 +67,7 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(0), gen(config.Defaults()))
assert.Equal(t, testProto(0), gen(config.Defaults(logp.L())))
})
t.Run("two protocols", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand All @@ -71,10 +77,10 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(1), gen(config.Defaults()))
assert.Equal(t, testProto(1), gen(config.Defaults(logp.L())))
gen, err = registry.Get("other_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(2), gen(config.Defaults()))
assert.Equal(t, testProto(2), gen(config.Defaults(logp.L())))
})
t.Run("not registered", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand Down
5 changes: 0 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ type TestLogWriter struct {
testing.TB
}

func (t TestLogWriter) Write(buf []byte) (int, error) {
t.Log(string(buf))
return len(buf), nil
}

func MakeAddress(t testing.TB, ipPortPair string) net.Addr {
ip, portS, err := net.SplitHostPort(ipPortPair)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions x-pack/filebeat/input/netflow/decoder/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"net"
"time"

Expand All @@ -18,6 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down Expand Up @@ -52,21 +52,23 @@ var templateV1 = template.Template{
type ReadHeaderFn func(*bytes.Buffer, net.Addr) (int, time.Time, record.Map, error)

type NetflowProtocol struct {
logger *log.Logger
logger *logp.Logger
flowTemplate *template.Template
version uint16
readHeader ReadHeaderFn
}

func init() {
protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
return NewProtocol(ProtocolID, &templateV1, readV1Header, log.New(config.LogOutput(), LogPrefix, 0))
return NewProtocol(ProtocolID, &templateV1, readV1Header, config.LogOutput().Named(LogPrefix))
}

func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *log.Logger) protocol.Protocol {
func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol {
return &NetflowProtocol{
logger: logger,
flowTemplate: template,
Expand All @@ -90,7 +92,7 @@ func (NetflowProtocol) Stop() error {
func (p *NetflowProtocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) {
numFlows, timestamp, metadata, err := p.readHeader(buf, source)
if err != nil {
p.logger.Printf("Failed parsing packet: %v", err)
p.logger.Debugf("Failed parsing packet: %v", err)
return nil, fmt.Errorf("error reading netflow header: %w", err)
}
flows, err = p.flowTemplate.Apply(buf, numFlows)
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v1/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
template2 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
"github.com/elastic/elastic-agent-libs/logp"
)

func init() {
logp.TestingSetup()
}

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(1), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605946fb0" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -105,7 +110,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
Loading

0 comments on commit d05a070

Please sign in to comment.