From 4ca91d1d924df5eb05d81fd99b66b947a6f16517 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Parada?= Date: Mon, 4 Nov 2024 15:18:39 -0300 Subject: [PATCH] add clouddlare, prometheus_remote_write and splunk to service ports --- service_port.go | 274 +++++++++++++++++++++++++++---------------- service_port_test.go | 129 ++++++++++++-------- utl.go | 50 ++++++-- 3 files changed, 293 insertions(+), 160 deletions(-) diff --git a/service_port.go b/service_port.go index 3d9a745..4f0f317 100644 --- a/service_port.go +++ b/service_port.go @@ -1,37 +1,154 @@ package fluentbitconfig import ( - "errors" + "net" + "strconv" "strings" "github.com/calyptia/go-fluentbit-config/v2/networking" ) -var servicePortDefaultsByPlugin = map[string]servicePortDefaults{ - // Inputs. - "collectd": {Port: 25826, Protocol: networking.ProtocolUDP}, - "elasticsearch": {Port: 9200, Protocol: networking.ProtocolTCP}, - "forward": {Port: 24224, Protocol: networking.ProtocolTCP}, // only if `unix_path` is not set. - "http": {Port: 9880, Protocol: networking.ProtocolTCP}, - "mqtt": {Port: 1883, Protocol: networking.ProtocolTCP}, - "opentelemetry": {Port: 4318, Protocol: networking.ProtocolTCP}, - "statsd": {Port: 8125, Protocol: networking.ProtocolUDP}, - "syslog": {Port: 5140, Protocol: networking.ProtocolUDP}, // only if `mode` is not `unix_udp` (default) or `unix_tcp` - "tcp": {Port: 5170, Protocol: networking.ProtocolTCP}, - "udp": {Port: 5170, Protocol: networking.ProtocolUDP}, - "cloudflare": {Port: 9880, Protocol: networking.ProtocolTCP}, - - // Outputs. - "prometheus_exporter": {Port: 2021, Protocol: networking.ProtocolTCP}, +type ServicePortGetter struct { + EnabledFunc func(p Plugin) bool + PortFunc func(p Plugin) (int32, bool) + ProtocolFunc func(p Plugin) (networking.Protocol, bool) + DefaultPort int32 + DefaultProtocol networking.Protocol } -type servicePortDefaults struct { - Port int - Protocol networking.Protocol +func fromPort(p Plugin) (int32, bool) { + portVal, ok := p.Properties.Get("port") + if !ok { + return 0, false + } + + i, ok := int32FromAny(portVal) + return i, ok +} + +var inputServicePorts = map[string]ServicePortGetter{ + "cloudflare": { + PortFunc: func(p Plugin) (int32, bool) { + addrVal, ok := p.Properties.Get("addr") + if !ok { + return 0, false + } + + _, port, err := net.SplitHostPort(stringFromAny(addrVal)) + if err != nil { + return 0, false + } + + i, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, false + } + + return int32(i), true + }, + DefaultPort: 9880, + DefaultProtocol: networking.ProtocolTCP, + }, + "collectd": { + PortFunc: fromPort, + DefaultPort: 25826, + DefaultProtocol: networking.ProtocolUDP, + }, + "elasticsearch": { + PortFunc: fromPort, + DefaultPort: 9200, + DefaultProtocol: networking.ProtocolTCP, + }, + "forward": { + EnabledFunc: func(p Plugin) bool { + return !p.Properties.Has("unix_path") + }, + PortFunc: fromPort, + DefaultPort: 24224, + DefaultProtocol: networking.ProtocolTCP, + }, + "http": { + PortFunc: fromPort, + DefaultPort: 9880, + DefaultProtocol: networking.ProtocolTCP, + }, + "mqtt": { + PortFunc: fromPort, + DefaultPort: 1883, + DefaultProtocol: networking.ProtocolTCP, + }, + "opentelemetry": { + PortFunc: fromPort, + DefaultPort: 4318, + DefaultProtocol: networking.ProtocolTCP, + }, + "prometheus_remote_write": { + PortFunc: fromPort, + DefaultPort: 8080, + DefaultProtocol: networking.ProtocolTCP, + }, + "splunk": { + PortFunc: fromPort, + DefaultPort: 8088, + DefaultProtocol: networking.ProtocolTCP, + }, + "statsd": { + PortFunc: fromPort, + DefaultPort: 8125, + DefaultProtocol: networking.ProtocolUDP, + }, + "syslog": { + EnabledFunc: func(p Plugin) bool { + modeVal, ok := p.Properties.Get("mode") + if !ok { + return false // default mode is unix_udp + } + + mode := strings.ToLower(stringFromAny(modeVal)) + return mode == "tcp" || mode == "udp" + }, + PortFunc: fromPort, + ProtocolFunc: func(p Plugin) (networking.Protocol, bool) { + modeVal, ok := p.Properties.Get("mode") + if !ok { + return networking.Protocol("unknown"), false + } + + mode := strings.ToLower(stringFromAny(modeVal)) + switch mode { + case "tcp": + return networking.ProtocolTCP, true + case "udp": + return networking.ProtocolUDP, true + } + + return networking.Protocol("unknown"), false + }, + DefaultPort: 5140, + DefaultProtocol: networking.ProtocolTCP, + }, + "tcp": { + PortFunc: fromPort, + DefaultPort: 5170, + DefaultProtocol: networking.ProtocolTCP, + }, + "udp": { + PortFunc: fromPort, + DefaultPort: 5170, + DefaultProtocol: networking.ProtocolUDP, + }, +} + +var outputServicePorts = map[string]ServicePortGetter{ + "prometheus_exporter": { + PortFunc: fromPort, + DefaultPort: 2021, + DefaultProtocol: networking.ProtocolTCP, + }, } type ServicePort struct { - Port int + Port int32 Protocol networking.Protocol Kind SectionKind // Plugin is not available for `service`` section kind. @@ -43,8 +160,8 @@ type ServicePorts []ServicePort func (c *Config) ServicePorts() ServicePorts { var out ServicePorts - enabledVal, ok := c.Service.Get("http_server") - if ok && (enabledVal == true || strings.ToLower(stringFromAny(enabledVal)) == "on") { + httpServerEnabled, ok := c.Service.Get("http_server") + if ok && (httpServerEnabled == true || strings.ToLower(stringFromAny(httpServerEnabled)) == "on") { portVal, ok := c.Service.Get("http_port") if !ok { out = append(out, ServicePort{ @@ -52,7 +169,7 @@ func (c *Config) ServicePorts() ServicePorts { Protocol: networking.ProtocolTCP, Kind: SectionKindService, }) - } else if i, ok := intFromAny(portVal); ok { + } else if i, ok := int32FromAny(portVal); ok { out = append(out, ServicePort{ Port: i, Protocol: networking.ProtocolTCP, @@ -61,90 +178,49 @@ func (c *Config) ServicePorts() ServicePorts { } } - lookup := func(kind SectionKind, plugins Plugins) { - for _, plugin := range plugins { - err := ValidateSection(kind, plugin.Properties) - if errors.Is(err, ErrMissingName) { - continue - } - - var e *UnknownPluginError - if errors.As(err, &e) { - continue - } - - if plugin.Name == "forward" && plugin.Properties.Has("unix_path") { - continue - } + process := func(getters map[string]ServicePortGetter, kind SectionKind, plugin Plugin) { + getter, ok := getters[plugin.Name] + if !ok { + return + } - if plugin.Name == "syslog" { - modeVal, ok := plugin.Properties.Get("mode") - if !ok { - continue - } - - if ok { - mode := strings.ToLower(stringFromAny(modeVal)) - if mode == "unix_udp" || mode == "unix_tcp" { - continue - } - } - } + if getter.EnabledFunc != nil && !getter.EnabledFunc(plugin) { + return + } - portVal, ok := plugin.Properties.Get("port") + port := getter.DefaultPort + if getter.PortFunc != nil { + var ok bool + port, ok = getter.PortFunc(plugin) if !ok { - defaults, ok := servicePortDefaultsByPlugin[plugin.Name] - if ok { - plugin := plugin - plugin.Properties = nil - out = append(out, ServicePort{ - Port: defaults.Port, - Protocol: defaults.Protocol, - Kind: kind, - Plugin: &plugin, - }) - } + port = getter.DefaultPort } + } - if ok { - port, ok := intFromAny(portVal) - if ok { - var protocol networking.Protocol - if plugin.Name == "syslog" { - modeVal, ok := plugin.Properties.Get("mode") - if ok { - if v := networking.Protocol(strings.ToUpper(stringFromAny(modeVal))); v.OK() { - protocol = v - } - } - } - - if protocol == "" { - defaultPort, ok := servicePortDefaultsByPlugin[plugin.Name] - if ok { - protocol = defaultPort.Protocol - } - } - - if protocol == "" { - protocol = networking.ProtocolTCP - } - - plugin := plugin - plugin.Properties = nil - out = append(out, ServicePort{ - Port: port, - Protocol: protocol, - Kind: kind, - Plugin: &plugin, - }) - } + protocol := getter.DefaultProtocol + if getter.ProtocolFunc != nil { + var ok bool + protocol, ok = getter.ProtocolFunc(plugin) + if !ok { + protocol = getter.DefaultProtocol } } + + out = append(out, ServicePort{ + Port: port, + Protocol: protocol, + Kind: kind, + Plugin: &plugin, + }) + } + + for _, input := range c.Pipeline.Inputs { + process(inputServicePorts, SectionKindInput, input) } - lookup(SectionKindInput, c.Pipeline.Inputs) - lookup(SectionKindOutput, c.Pipeline.Outputs) + for _, output := range c.Pipeline.Outputs { + process(outputServicePorts, SectionKindOutput, output) + } return out } diff --git a/service_port_test.go b/service_port_test.go index a2489cb..8be64f4 100644 --- a/service_port_test.go +++ b/service_port_test.go @@ -1,18 +1,39 @@ package fluentbitconfig import ( + "strconv" "testing" "github.com/alecthomas/assert/v2" "github.com/calyptia/go-fluentbit-config/v2/networking" + "github.com/calyptia/go-fluentbit-config/v2/property" ) func TestConfig_ServicePorts(t *testing.T) { + expected := func(port int32, protocol networking.Protocol, kind SectionKind, name string, index int, props ...property.Property) ServicePort { + pp := property.Properties{ + {Key: "name", Value: name}, + } + pp = append(pp, props...) + return ServicePort{ + Port: port, + Protocol: protocol, + Kind: kind, + Plugin: &Plugin{ + ID: name + "." + strconv.Itoa(index), + Name: name, + Properties: pp, + }, + } + } + t.Run("defaults", func(t *testing.T) { config, err := ParseAs(` [SERVICE] http_server on + [INPUT] + name cloudflare [INPUT] name collectd [INPUT] @@ -25,35 +46,39 @@ func TestConfig_ServicePorts(t *testing.T) { name mqtt [INPUT] name opentelemetry + [INPUT] + name prometheus_remote_write + [INPUT] + name splunk [INPUT] name statsd [INPUT] name syslog - mode udp [INPUT] name tcp [INPUT] name udp - [INPUT] - name cloudflare [OUTPUT] name prometheus_exporter `, FormatClassic) assert.NoError(t, err) + assert.Equal(t, ServicePorts{ {Port: 2020, Protocol: networking.ProtocolTCP, Kind: SectionKindService}, - {Port: 25826, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "collectd.0", Name: "collectd"}}, - {Port: 9200, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "elasticsearch.1", Name: "elasticsearch"}}, - {Port: 24224, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "forward.2", Name: "forward"}}, - {Port: 9880, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "http.3", Name: "http"}}, - {Port: 1883, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "mqtt.4", Name: "mqtt"}}, - {Port: 4318, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "opentelemetry.5", Name: "opentelemetry"}}, - {Port: 8125, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "statsd.6", Name: "statsd"}}, - {Port: 5140, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "syslog.7", Name: "syslog"}}, - {Port: 5170, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "tcp.8", Name: "tcp"}}, - {Port: 5170, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "udp.9", Name: "udp"}}, - {Port: 9880, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "cloudflare.10", Name: "cloudflare"}}, - {Port: 2021, Protocol: networking.ProtocolTCP, Kind: SectionKindOutput, Plugin: &Plugin{ID: "prometheus_exporter.0", Name: "prometheus_exporter"}}, + expected(9880, networking.ProtocolTCP, SectionKindInput, "cloudflare", 0), + expected(25826, networking.ProtocolUDP, SectionKindInput, "collectd", 1), + expected(9200, networking.ProtocolTCP, SectionKindInput, "elasticsearch", 2), + expected(24224, networking.ProtocolTCP, SectionKindInput, "forward", 3), + expected(9880, networking.ProtocolTCP, SectionKindInput, "http", 4), + expected(1883, networking.ProtocolTCP, SectionKindInput, "mqtt", 5), + expected(4318, networking.ProtocolTCP, SectionKindInput, "opentelemetry", 6), + expected(8080, networking.ProtocolTCP, SectionKindInput, "prometheus_remote_write", 7), + expected(8088, networking.ProtocolTCP, SectionKindInput, "splunk", 8), + expected(8125, networking.ProtocolUDP, SectionKindInput, "statsd", 9), + // expected(5140, networking.ProtocolTCP, SectionKindInput, "syslog", 10), // default syslog without explicit mode tcp or udp is skipped + expected(5170, networking.ProtocolTCP, SectionKindInput, "tcp", 11), + expected(5170, networking.ProtocolUDP, SectionKindInput, "udp", 12), + expected(2021, networking.ProtocolTCP, SectionKindOutput, "prometheus_exporter", 0), }, config.ServicePorts()) }) @@ -62,55 +87,72 @@ func TestConfig_ServicePorts(t *testing.T) { [SERVICE] http_server on http_port 1 + [INPUT] + name cloudflare + addr :2 [INPUT] name collectd - port 2 + port 3 [INPUT] name elasticsearch - port 3 + port 4 [INPUT] name forward - port 4 + port 5 [INPUT] name http - port 5 + port 6 [INPUT] name mqtt - port 6 + port 7 [INPUT] name opentelemetry - port 7 + port 8 + [INPUT] + name prometheus_remote_write + port 9 + [INPUT] + name splunk + port 10 [INPUT] name statsd - port 8 + port 11 + [INPUT] + name syslog + mode tcp + port 12 [INPUT] name syslog mode udp - port 9 + port 13 [INPUT] name tcp - port 10 + port 14 [INPUT] name udp - port 11 + port 15 [OUTPUT] name prometheus_exporter - port 12 + port 16 `, FormatClassic) assert.NoError(t, err) assert.Equal(t, ServicePorts{ {Port: 1, Protocol: networking.ProtocolTCP, Kind: SectionKindService}, - {Port: 2, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "collectd.0", Name: "collectd"}}, - {Port: 3, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "elasticsearch.1", Name: "elasticsearch"}}, - {Port: 4, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "forward.2", Name: "forward"}}, - {Port: 5, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "http.3", Name: "http"}}, - {Port: 6, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "mqtt.4", Name: "mqtt"}}, - {Port: 7, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "opentelemetry.5", Name: "opentelemetry"}}, - {Port: 8, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "statsd.6", Name: "statsd"}}, - {Port: 9, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "syslog.7", Name: "syslog"}}, - {Port: 10, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "tcp.8", Name: "tcp"}}, - {Port: 11, Protocol: networking.ProtocolUDP, Kind: SectionKindInput, Plugin: &Plugin{ID: "udp.9", Name: "udp"}}, - {Port: 12, Protocol: networking.ProtocolTCP, Kind: SectionKindOutput, Plugin: &Plugin{ID: "prometheus_exporter.0", Name: "prometheus_exporter"}}, + expected(2, networking.ProtocolTCP, SectionKindInput, "cloudflare", 0, property.Property{Key: "addr", Value: ":2"}), + expected(3, networking.ProtocolUDP, SectionKindInput, "collectd", 1, property.Property{Key: "port", Value: int64(3)}), + expected(4, networking.ProtocolTCP, SectionKindInput, "elasticsearch", 2, property.Property{Key: "port", Value: int64(4)}), + expected(5, networking.ProtocolTCP, SectionKindInput, "forward", 3, property.Property{Key: "port", Value: int64(5)}), + expected(6, networking.ProtocolTCP, SectionKindInput, "http", 4, property.Property{Key: "port", Value: int64(6)}), + expected(7, networking.ProtocolTCP, SectionKindInput, "mqtt", 5, property.Property{Key: "port", Value: int64(7)}), + expected(8, networking.ProtocolTCP, SectionKindInput, "opentelemetry", 6, property.Property{Key: "port", Value: int64(8)}), + expected(9, networking.ProtocolTCP, SectionKindInput, "prometheus_remote_write", 7, property.Property{Key: "port", Value: int64(9)}), + expected(10, networking.ProtocolTCP, SectionKindInput, "splunk", 8, property.Property{Key: "port", Value: int64(10)}), + expected(11, networking.ProtocolUDP, SectionKindInput, "statsd", 9, property.Property{Key: "port", Value: int64(11)}), + expected(12, networking.ProtocolTCP, SectionKindInput, "syslog", 10, property.Property{Key: "mode", Value: "tcp"}, property.Property{Key: "port", Value: int64(12)}), + expected(13, networking.ProtocolUDP, SectionKindInput, "syslog", 11, property.Property{Key: "mode", Value: "udp"}, property.Property{Key: "port", Value: int64(13)}), + expected(14, networking.ProtocolTCP, SectionKindInput, "tcp", 12, property.Property{Key: "port", Value: int64(14)}), + expected(15, networking.ProtocolUDP, SectionKindInput, "udp", 13, property.Property{Key: "port", Value: int64(15)}), + expected(16, networking.ProtocolTCP, SectionKindOutput, "prometheus_exporter", 0, property.Property{Key: "port", Value: int64(16)}), }, config.ServicePorts()) }) @@ -138,17 +180,4 @@ func TestConfig_ServicePorts(t *testing.T) { assert.NoError(t, err) assert.Equal(t, nil, config.ServicePorts()) }) - - t.Run("with_mode", func(t *testing.T) { - config, err := ParseAs(` - [INPUT] - name syslog - mode tcp - port 1 - `, FormatClassic) - assert.NoError(t, err) - assert.Equal(t, ServicePorts{ - {Port: 1, Protocol: networking.ProtocolTCP, Kind: SectionKindInput, Plugin: &Plugin{ID: "syslog.0", Name: "syslog"}}, - }, config.ServicePorts()) - }) } diff --git a/utl.go b/utl.go index 690a94a..5ae8092 100644 --- a/utl.go +++ b/utl.go @@ -51,7 +51,7 @@ func stringFromAny(v any) string { if b, err := t.MarshalJSON(); err == nil { return stringFromAny(string(b)) } - case map[string]any: + case map[string]any, []any: var buff bytes.Buffer enc := json.NewEncoder(&buff) enc.SetEscapeHTML(false) @@ -102,29 +102,57 @@ func fmtFloat[F float32 | float64](f F) string { return s } -func intFromAny(v any) (int, bool) { +func int32FromAny(v any) (int32, bool) { if v == nil { return 0, false } switch v := v.(type) { case int: - return v, true + if int(int32(v)) == v { + return int32(v), true + } + case int8: + if int8(int32(v)) == v { + return int32(v), true + } + case int16: + if int16(int32(v)) == v { + return int32(v), true + } case int32: - return int(v), true + return v, true case int64: - if int64(int(v)) == v { - return int(v), true + if int64(int32(v)) == v { + return int32(v), true + } + case uint: + if uint(int32(v)) == v { + return int32(v), true + } + case uint16: + if uint16(int32(v)) == v { + return int32(v), true + } + case uint32: + if uint32(int32(v)) == v { + return int32(v), true + } + case uint64: + if uint64(int32(v)) == v { + return int32(v), true } case float32: - return int(v), true + if float32(int32(v)) == v { + return int32(v), true + } case float64: - if float64(int(v)) == v { - return int(v), true + if float64(int32(v)) == v { + return int32(v), true } case string: - if i, err := strconv.Atoi(v); err == nil { - return i, true + if i, err := strconv.ParseInt(v, 10, 32); err == nil { + return int32(i), true } } return 0, false