diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f2c0f13..86543dea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ## [Unreleased] -## [0.5.0] - 2022-07-31 +## [0.5.0] - 2022-08-21 ### Added @@ -26,6 +26,22 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fix bug in exists predicate +## [0.4.4] - 2022-08-01 + +### Added + +- Add rate limiting filter with time decaying + +### Changed + +- Bump UBI to 8.6-855 +- Update reference to sf-apis + +### Fixed + +- Fix exists predicate +- Fix handling of integers and booleans in MatStr function + ## [0.4.3] - 2022-06-21 ### Changed @@ -149,7 +165,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - First release of SysFlow Processor. [Unreleased]: https://github.com/sysflow-telemetry/sf-processor/compare/0.5.0-rc1...HEAD -[0.5.0]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.3...0.5.0-rc1 +[0.5.0]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.4...0.5.0-rc1 +[0.4.4]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.3...0.4.4 [0.4.3]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.2...0.4.3 [0.4.2]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.1...0.4.2 [0.4.1]: https://github.com/sysflow-telemetry/sf-processor/compare/0.4.0...0.4.1 diff --git a/LICENSE.md b/LICENSE.md index f0601d5f..f3e54878 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,4 +1,4 @@ -``` + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -175,4 +175,28 @@ of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS -``` \ No newline at end of file + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 The Falco Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/core/flattener/config.go b/core/flattener/config.go new file mode 100644 index 00000000..dc2f6484 --- /dev/null +++ b/core/flattener/config.go @@ -0,0 +1,82 @@ +// +// Copyright (C) 2022 IBM Corporation. +// +// Authors: +// Frederico Araujo +// Teryl Taylor +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package flattener flattens input telemetry in a flattened representation. +package flattener + +import ( + "strconv" + "time" +) + +// Configuration keys. +const ( + FilterOnOffKey string = "filter.enabled" + FilterMaxAgeKey string = "filter.maxage" +) + +// Config defines a configuration object for the engine. +type Config struct { + FilterOnOff OnOff + FilterMaxAge time.Duration +} + +// CreateConfig creates a new config object from config dictionary. +func CreateConfig(conf map[string]interface{}) (Config, error) { + var c Config = Config{FilterOnOff: Off, FilterMaxAge: 24 * time.Hour} // default values + var err error + if v, ok := conf[FilterOnOffKey].(string); ok { + c.FilterOnOff = parseOnOffType(v) + } + if v, ok := conf[FilterMaxAgeKey].(string); ok { + var duration int + duration, err = strconv.Atoi(v) + if err == nil { + c.FilterMaxAge = time.Duration(duration) * time.Second + } + } + return c, err +} + +// OnOff defines an On-Off state type. +type OnOff int32 + +// OnOff types. +const ( + Off OnOff = iota + On +) + +func (s OnOff) String() string { + return [...]string{"off", "on"}[s] +} + +func (s OnOff) Enabled() bool { + return s == On +} + +func parseOnOffType(s string) OnOff { + if Off.String() == s { + return Off + } + if On.String() == s { + return On + } + return Off +} diff --git a/core/flattener/filter.go b/core/flattener/filter.go new file mode 100644 index 00000000..674153e3 --- /dev/null +++ b/core/flattener/filter.go @@ -0,0 +1,126 @@ +// +// Copyright (C) 2022 IBM Corporation. +// +// Authors: +// Frederico Araujo +// Teryl Taylor +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package flattener flattens input telemetry in a flattened representation. +package flattener + +import ( + "container/list" + "encoding/binary" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/sysflow-telemetry/sf-apis/go/sfgo" +) + +var byteInt64 []byte = make([]byte, 8) + +// Filter is a time decaying filter with a TTL per entry. +type Filter struct { + m map[uint64]int64 + q *list.List + ttl time.Duration +} + +// Entry encodes a hash value with the time it was first added to the filter. +type Entry struct { + h uint64 + firstSeen time.Time +} + +// NewFilter creates a new time decaying filter that evicts entries that have been seen longer than t duration. +func NewFilter(t time.Duration) *Filter { + return &Filter{m: make(map[uint64]int64), q: list.New(), ttl: t} +} + +// Test tests if hash h has been seen since maximum ttl. +func (f *Filter) Test(h uint64) bool { + f.evictAgedEntries() + _, ok := f.m[h] + return ok +} + +// TestAndAdd tests if hash h has been seen since maximum ttl and adds or increments the element in the filter cache. +func (f *Filter) TestAndAdd(h uint64) bool { + f.evictAgedEntries() + _, ok := f.m[h] + f.Add(h) + return ok +} + +// Contains returns how many times hash h has been seen during its ttl time. +func (f *Filter) Count(h uint64) int64 { + f.evictAgedEntries() + if count, ok := f.m[h]; ok { + return count + } + return 0 +} + +// Add adds hash h to the filter. +func (f *Filter) Add(h uint64) { + if v, ok := f.m[h]; !ok { + f.m[h] = 1 + f.q.PushBack(Entry{h: h, firstSeen: time.Now()}) + } else { + f.m[h] = v + 1 + } +} + +func (f *Filter) evictAgedEntries() { + for f.q.Len() > 0 { + e := f.q.Front() + entry := e.Value.(Entry) + if time.Since(entry.firstSeen) < f.ttl { + break + } + f.q.Remove(e) + delete(f.m, entry.h) + } +} + +// semanticHash computes a hash value over record attributes denoting the semantics of the record (used in the time decay filter). +func semanticHash(fr *sfgo.FlatRecord) uint64 { + h := xxhash.New() + h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.PROC_EXE_STR])) + h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.PROC_EXEARGS_STR])) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_UID_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_GID_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.OPFLAGS_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.PROC_TTY_INT])) + h.Write(byteInt64) + sfType := fr.Ints[sfgo.SYSFLOW_IDX][sfgo.SF_REC_TYPE] + if sfType == sfgo.NET_FLOW { + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_SIP_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_DIP_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_DPORT_INT])) + h.Write(byteInt64) + binary.LittleEndian.PutUint64(byteInt64, uint64(fr.Ints[sfgo.SYSFLOW_SRC][sfgo.FL_NETW_PROTO_INT])) + h.Write(byteInt64) + } + if sfType == sfgo.FILE_FLOW || sfType == sfgo.FILE_EVT { + h.Write([]byte(fr.Strs[sfgo.SYSFLOW_SRC][sfgo.FILE_PATH_STR])) + } + return h.Sum64() +} diff --git a/core/flattener/flattener.go b/core/flattener/flattener.go index c474ab33..7a4f1cbf 100644 --- a/core/flattener/flattener.go +++ b/core/flattener/flattener.go @@ -48,7 +48,9 @@ func NewFlattenerChan(size int) interface{} { // Flattener defines the main class for the flatterner plugin. type Flattener struct { - outCh []chan *sfgo.FlatRecord + config Config + filter *Filter + outCh []chan *sfgo.FlatRecord } // NewFlattener creates a new Flattener instance. @@ -68,6 +70,11 @@ func (s *Flattener) RegisterHandler(hc plugins.SFHandlerCache) { // Init initializes the handler with a configuration map. func (s *Flattener) Init(conf map[string]interface{}) error { + s.config, _ = CreateConfig(conf) // no err check, assuming defaults + if s.config.FilterOnOff.Enabled() { + s.filter = NewFilter(s.config.FilterMaxAge) + logger.Info.Printf("Initialized rate limiter with %s time decay", s.config.FilterMaxAge) + } return nil } @@ -83,6 +90,16 @@ func (s *Flattener) SetOutChan(chObj []interface{}) { } } +// out sends a record to every output channel in the plugin. +func (s *Flattener) out(fr *sfgo.FlatRecord) { + if s.config.FilterOnOff.Enabled() && s.filter != nil && s.filter.TestAndAdd(semanticHash(fr)) { + return + } + for _, c := range s.outCh { + c <- fr + } +} + // Cleanup tears down resources. func (s *Flattener) Cleanup() { logger.Trace.Println("Calling Cleanup on Flattener channel") @@ -154,9 +171,7 @@ func (s *Flattener) HandleNetFlow(sf *plugins.CtxSysFlow, nf *sfgo.NetworkFlow) fr.Ints[sfgo.SYSFLOW_IDX][sfgo.FL_NETW_NUMWSENDBYTES_INT] = nf.NumWSendBytes fr.Ptree = sf.PTree fr.GraphletID = sf.GraphletID - for _, ch := range s.outCh { - ch <- fr - } + s.out(fr) return nil } @@ -177,9 +192,7 @@ func (s *Flattener) HandleFileFlow(sf *plugins.CtxSysFlow, ff *sfgo.FileFlow) er fr.Ints[sfgo.SYSFLOW_IDX][sfgo.FL_FILE_NUMWSENDBYTES_INT] = ff.NumWSendBytes fr.Ptree = sf.PTree fr.GraphletID = sf.GraphletID - for _, ch := range s.outCh { - ch <- fr - } + s.out(fr) return nil } @@ -213,9 +226,7 @@ func (s *Flattener) HandleFileEvt(sf *plugins.CtxSysFlow, fe *sfgo.FileEvent) er fr.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_FILE_RET_INT] = int64(fe.Ret) fr.Ptree = sf.PTree fr.GraphletID = sf.GraphletID - for _, ch := range s.outCh { - ch <- fr - } + s.out(fr) return nil } @@ -240,9 +251,7 @@ func (s *Flattener) HandleProcEvt(sf *plugins.CtxSysFlow, pe *sfgo.ProcessEvent) fr.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_PROC_RET_INT] = int64(pe.Ret) fr.Ptree = sf.PTree fr.GraphletID = sf.GraphletID - for _, ch := range s.outCh { - ch <- fr - } + s.out(fr) return nil } diff --git a/core/policyengine/engine/fieldmapper.go b/core/policyengine/engine/fieldmapper.go index db9161b9..62530cdb 100644 --- a/core/policyengine/engine/fieldmapper.go +++ b/core/policyengine/engine/fieldmapper.go @@ -173,9 +173,6 @@ func (m FieldMapper) MapStr(attr string) StrFieldMap { if v, ok := o.(string); ok { if isPathExp && v != "" && jsonpath != "" { return gjson.Get(v, jsonpath).String() - //res := gjson.Get(v, jsonpath).String() - //logger.Trace.Printf("%s[%s] = %s", baseattr, jsonpath, res) - //return res } return trimBoundingQuotes(v) } else if v, ok := o.(int64); ok { diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 4a30a0ea..0627cfdd 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -174,3 +174,18 @@ docker run -e EXPORTER_PORT=514 ... ``` + +### Rate limiter configuration (experimental) + +The `flattener` handler has a built-in time decay filter that can be enabled to reduce even rates in the processor. The filter uses a time-decay bloom filter based on a semantic hashing of records. This means that the filter should only forward one record matching a semantic hash per time decay period. The semantic hash takes into consideration process, flow and event attributes. To enable rate limiting, modify the `sysflowreader` processor as follows: + +```json +{ + "processor": "sysflowreader", + "handler": "flattener", + "in": "sysflow sysflowchan", + "out": "flat flattenerchan", + "filter.enabled": "on|off (default: off)", + "filter.maxage": "time decay in minutes (default: 24H)" +} +``` \ No newline at end of file diff --git a/plugins/processors/example/go.mod b/plugins/processors/example/go.mod index afd2f3ff..ce7f1c78 100644 --- a/plugins/processors/example/go.mod +++ b/plugins/processors/example/go.mod @@ -28,6 +28,7 @@ require ( require ( github.com/actgardner/gogen-avro/v7 v7.3.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 // indirect ) diff --git a/plugins/processors/example/go.sum b/plugins/processors/example/go.sum index 5d62d9bd..4c77c967 100644 --- a/plugins/processors/example/go.sum +++ b/plugins/processors/example/go.sum @@ -1,5 +1,7 @@ github.com/actgardner/gogen-avro/v7 v7.3.1 h1:6JJU3o7168lcyIB6uXYyYdflCsJT3aMFKZPSpSc4toI= github.com/actgardner/gogen-avro/v7 v7.3.1/go.mod h1:1d45RpDvI29sU7l9wUxlRTEglZSdQSbd6bDbWJaEMgo= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/resources/pipelines/pipeline.template.json b/resources/pipelines/pipeline.template.json index d14fe49d..97a458d6 100644 --- a/resources/pipelines/pipeline.template.json +++ b/resources/pipelines/pipeline.template.json @@ -5,7 +5,9 @@ "processor": "sysflowreader", "handler": "flattener", "in": "sysflow sysflowchan", - "out": "flat flattenerchan" + "out": "flat flattenerchan", + "filter.enabled": "on|off (default: off)", + "filter.maxage": "time decay in minutes (default: 24H)" }, { "processor": "policyengine", @@ -21,8 +23,8 @@ { "processor": "exporter", "in": "evt eventchan", - "export": "terminal|file|syslog|es|findings|null (default: terminal)", - "format": "json|ecs|occurrence", + "export": "terminal|file|syslog|es|findings|null (default: terminal)", + "format": "json|ecs|occurrence", "buffer": "event aggregation buffer (default: 0)", "vault.secrets": "true|false", "vault.path": "/run/secrets (default)", @@ -31,7 +33,7 @@ "syslog.tag": "rsyslog tag (default: sysflow)", "syslog.source": "rsyslog source hostname (default: hostname)", "syslog.host": "rsyslog host (default: localhost)", - "syslog.port": "ryslog port (default: 514)", + "syslog.port": "ryslog port (default: 514)", "es.addresses": "ip1,ip2,... (comma-separated list)", "es.index": "elastic index (default: sysflow)", "es.username": "elastic username (do not set it if reading from secret vault)", @@ -44,12 +46,12 @@ "findings.accountid": "findings API account ID", "findings.provider": "findings API provider", "findings.note": "findings API node ID", - "findings.sqlqueryurl": "SQL Query URL (default: https://us.sql-query.cloud.ibm.com/sqlquery)", - "findings.sqlquerycrn": "SQL Query instance crn", - "findings.region": "findings API region", - "findings.path": "findings events path (default: /mnt/occurrences)", + "findings.sqlqueryurl": "SQL Query URL (default: https://us.sql-query.cloud.ibm.com/sqlquery)", + "findings.sqlquerycrn": "SQL Query instance crn", + "findings.region": "findings API region", + "findings.path": "findings events path (default: /mnt/occurrences)", "findings.pool.capacity": "findings event pool capacity (default: 250)", - "findings.pool.maxage": "findings event pool age limit in minutes (default: 1440)" - } + "findings.pool.maxage": "findings event pool age limit in minutes (default: 1440)" + } ] }