From 27a30dd48df6f95a8677b1085123d288581cb54d Mon Sep 17 00:00:00 2001 From: Chris Ludden Date: Sun, 4 Oct 2020 14:30:46 +0200 Subject: [PATCH] feat: adds result config to kubernetes input, improves docs --- README.md | 83 ++++- doc/kubernetes_input.md | 128 +++++-- doc/kubernetes_output.md | 37 +- doc/kubernetes_processor.md | 29 +- doc/kubernetes_status_output.md | 34 +- example/manifest.yml | 10 - example/status.yml | 121 +++---- go.mod | 2 +- go.sum | 11 +- input/kubernetes.go | 64 +++- processor/kubernetes.go | 42 ++- .../internal/bloblang/parser/combinators.go | 317 +++++++----------- .../internal/bloblang/parser/field_parser.go | 35 +- .../bloblang/parser/mapping_parser.go | 130 +++---- .../parser/query_arithmetic_parser.go | 33 +- .../parser/query_expression_parser.go | 16 +- .../bloblang/parser/query_function_parser.go | 135 +++----- .../bloblang/parser/query_literal_parser.go | 8 +- .../internal/bloblang/parser/query_parser.go | 10 +- .../benthos/v3/internal/docs/component.go | 5 +- .../benthos/v3/lib/bloblang/package.go | 104 ++++++ .../Jeffail/benthos/v3/lib/input/broker.go | 4 +- .../benthos/v3/lib/input/reader/amqp_0_9.go | 4 +- .../benthos/v3/lib/input/reader/gcp_pubsub.go | 4 +- .../benthos/v3/lib/input/reader/kafka.go | 4 +- .../v3/lib/input/reader/kafka_balanced.go | 7 +- .../benthos/v3/lib/input/reader/kafka_cg.go | 14 +- .../benthos/v3/lib/input/reader/kinesis.go | 4 +- .../v3/lib/input/reader/kinesis_balanced.go | 4 +- .../v3/lib/input/reader/nats_stream.go | 4 +- .../benthos/v3/lib/input/reader/nsq.go | 4 +- .../v3/lib/input/reader/redis_streams.go | 4 +- .../benthos/v3/lib/message/batch/policy.go | 39 +++ .../Jeffail/benthos/v3/lib/output/batcher.go | 18 + .../Jeffail/benthos/v3/lib/output/broker.go | 12 +- .../Jeffail/benthos/v3/lib/output/dynamodb.go | 12 +- .../benthos/v3/lib/output/elasticsearch.go | 12 +- .../benthos/v3/lib/output/http_client.go | 12 +- .../Jeffail/benthos/v3/lib/output/kafka.go | 12 +- .../Jeffail/benthos/v3/lib/output/kinesis.go | 12 +- .../benthos/v3/lib/output/kinesis_firehose.go | 12 +- .../Jeffail/benthos/v3/lib/output/s3.go | 12 +- .../Jeffail/benthos/v3/lib/output/sqs.go | 12 +- .../benthos/v3/lib/output/table_storage.go | 13 +- .../benthos/v3/lib/output/writer/dynamodb.go | 4 +- .../v3/lib/output/writer/elasticsearch.go | 5 +- .../v3/lib/output/writer/http_client.go | 4 +- .../benthos/v3/lib/output/writer/kafka.go | 5 +- .../benthos/v3/lib/output/writer/kinesis.go | 4 +- .../v3/lib/output/writer/kinesis_firehose.go | 5 +- .../benthos/v3/lib/output/writer/s3.go | 4 +- .../benthos/v3/lib/output/writer/sqs.go | 4 +- .../lib/output/writer/table_storage_config.go | 4 +- .../benthos/v3/lib/processor/branch.go | 20 +- .../benthos/v3/lib/service/deprecated.go | 2 +- .../Jeffail/benthos/v3/lib/service/run.go | 25 +- .../Jeffail/benthos/v3/lib/service/service.go | 6 + vendor/modules.txt | 3 +- 58 files changed, 911 insertions(+), 773 deletions(-) create mode 100644 vendor/github.com/Jeffail/benthos/v3/lib/bloblang/package.go diff --git a/README.md b/README.md index 6934675..e7cdb4a 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,17 @@ # benthos-kubernetes -a collections of [benthos](https://github.com/Jeffail/benthos) plugins for integrating with Kubernetes. +a collection of [benthos](https://github.com/Jeffail/benthos) plugins for integrating with Kubernetes. -**Inputs:** +#### Inputs - [kubernetes](./doc/kubernetes_input.md) streams kubernetes objects for one or more configured watches -**Outputs:** +#### Outputs - [kubernetes](./doc/kubernetes_output.md) creates, updates, and deleted kubernetes objects - [kubernetes_status](./doc/kubernetes_status_output.md) writes object status to kubernetes -**Processors:** +#### Processors - [kubernetes](./doc/kubernetes_processor.md) performs operations against a kubernetes cluster @@ -27,9 +27,82 @@ a collections of [benthos](https://github.com/Jeffail/benthos) plugins for integ - download a [release](https://github.com/cludden/benthos-kubernetes/releases) - as a benthos [plugin](./cmd/benthos/main.go) + ```go + package main + + import ( + "github.com/Jeffail/benthos/v3/lib/service" + _ "github.com/cludden/benthos-kubernetes/input" + _ "github.com/cludden/benthos-kubernetes/output" + _ "github.com/cludden/benthos-kubernetes/processor" + ) + + func main() { + service.Run() + } + ``` + ## Getting Started -See [examples](./example/status.yml) +```yaml +input: + type: kubernetes + plugin: + watches: + - group: example.com + version: v1 + kind: Foo + owns: + - group: example.com + version: v1 + kind: Bar + result: + requeue: meta().exists("requeue") + requeue_after: ${!meta("requeue_after").catch("")} + +pipeline: + processors: + - switch: + # ignore deleted items + - check: meta().exists("deleted") + processors: + - bloblang: root = deleted() + + # reconcile dependent resources + - processors: + - branch: + processors: + - bloblang: | + apiVersion = "example.com/v1" + kind = "Bar" + metadata.labels = metadata.labels + metadata.name = metadata.name + metadata.namespace = metadata.namespace + metadata.ownerReferences = [{ + "apiVersion": apiVersion, + "kind": kind, + "controller": true, + "blockOwnerDeletion": true, + "name": metadata.name, + "uid": metadata.uid + }] + spec = spec + - type: kubernetes + plugin: + operator: get + - type: kubernetes + plugin: + operator: ${! if errored() { "create" } else { "update" } } + result_map: | + root.status.bar = metadata.uid + root.status.status = "Ready" + +output: + type: kubernetes_status + plugin: {} +``` + +Additional examples provided [here](./example) ## License diff --git a/doc/kubernetes_input.md b/doc/kubernetes_input.md index fb3f6b2..8d6abb2 100644 --- a/doc/kubernetes_input.md +++ b/doc/kubernetes_input.md @@ -3,6 +3,7 @@ streams kubernetes objects for one or more configured watches **Examples** + ```yaml input: type: kubernetes @@ -34,61 +35,144 @@ input: owns: - version: v1 kind: Pod + result: + requeue: meta().exists("requeue") + requeue_after: ${!meta("requeue_after).or("")} ``` ## Fields -`group` +### `result` + +Customize the result of a reconciliation request via [synchronous responses](https://www.benthos.dev/docs/guides/sync_responses). + +Type: `object` -resource group +### `result.requeue` + +A [Bloblang query](https://www.benthos.dev/docs/guides/bloblang/about/) that should return a boolean value indicating whether the input resource should be requeued. An empty string disables this functionality. Type: `string` Default: `""` -`kind` +### `result.requeue_after` -resource kind +Specify a duration after which the input resource should be requeued. This is a string value, which allows you to customize it based on resulting payloads and their metadata using [interpolation functions](https://www.benthos.dev/docs/configuration/interpolation#bloblang-queries). An empty string disables this functionality. Type: `string` Default: `""` -`version` +### `watches[]` + +A list of watch configurations that specify the set of kubernetes objects to target. + +Type: `list(object)` +Default: `[]` +Required: `true` -resource version +### `watches[].group` + +Resource group selector Type: `string` Default: `""` -`namespaces` +### `watches[].kind` + +Resource kind selector + +Type: `string` +Default: `""` +Required: `true` -optional namespace filter +### `watches[].namespaces` -Type: `[]string` +Resource namespace selector. An empty array here indicates cluster scope. -`owns` +Type: `list(string)` +Default: `[]` -optional list of dependencies to watch +### `watches[].owns[]` -Type: `[]object({group: string, version: string, kind: string })` +Specifies an optional list of dependencies to watch. This requires the correct owner references to be present on the dependent objects. -`selector` +Type: `list(object)` +Default: `[]` -optional label selector +### `watches[].owns[].group` -Type: `object` +Dependency group selector -`selector.matchLabels` +Type: `string` +Default: `""` + +### `watches[].owns[].kind` + +Dependency kind selector + +Type: `string` +Default: `""` +Required: `true` + +### `watches[].owns[].version` + +Dependency version selector + +Type: `string` +Default: `""` +Required: `true` -optional label selector match requirements +### `watches[].selector` + +Optional label selector to apply as target filter. Type: `object` +Default: `{}` + +### `watches[].selector.matchExpressions[]` + +List of label match expressions to apply as target filter. + +Type: `list(object)` +Default: `{}` + +### `watches[].selector.matchExpressions[].key` + +Subject of the given expression. + +Type: `string` +Default: `""` +Required: `true` + +### `watches[].selector.matchExpressions[].operator` + +Operator of the given expression (e.g. `Exists`, `In`, `NotIn`) + +Type: `string` +Default: `""` +Required: `true` + +### `watches[].selector.matchExpressions[].values[]` + +List of values applied to operator in order to evaluate the expression. + +Type: `string` +Default: `[]` -`selector.matchExpressions` +### `watches[].selector.matchLabels` -optional label selector match expressions +Map of key value label pairs to use as target filter. -Type: `object({key: string, operator: string, values: []string})` +Type: `map(string)` +Default: `{}` +### `watches[].version` + +Resource version selector + +Type: `string` +Default: `""` +Required: `true` ## Metadata @@ -102,7 +186,3 @@ This input adds the following metadata fields to each message: - namespace - version ``` - -## Synchronous Responses - -Additionally, this input will check for a `requeue_after` metadata entry on [synchronous response](https://www.benthos.dev/docs/guides/sync_responses) messages, and if found, will requeue the object for reconciliation. \ No newline at end of file diff --git a/doc/kubernetes_output.md b/doc/kubernetes_output.md index 61fd458..8086978 100644 --- a/doc/kubernetes_output.md +++ b/doc/kubernetes_output.md @@ -3,31 +3,24 @@ creates, updates, and deleted kubernetes objects based This output will perform the following actions for all message parts: + - fail if the payload is not a valid kubernetes object - delete the object if a `deleted` metadata key is present - update the object if a `uid` is present - create the object if no `uid` is present **Examples** + ```yaml input: type: kubernetes plugin: watches: - group: example.com - version: v1alpha1 + version: v1 kind: Foo pipeline: - processors: - - bloblang: | - root = match { - meta().exists("deleted") => deleted() - } - -output: - type: kubernetes - plugin: {} processors: - bloblang: | map finalizer { @@ -35,7 +28,29 @@ output: metadata.finalizers = metadata.finalizers.append("finalizer.foos.example.com") } root = match { + meta().exists("deleted") => deleted() metadata.finalizers.or([]).contains("finalizer.foos.example.com") => deleted() _ => this.apply("finalizer") } -``` \ No newline at end of file + +output: + type: kubernetes + plugin: {} +``` + +## Fields + +### `deletion_propagation` + +Specifies the [deletion propagation policy](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents) when performing `delete` operations. + +Type: `string` +Default: `Background` +Options: `Background`, `Foreground`, `Orphan` + +### `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + +Type: `number` +Default: `1` diff --git a/doc/kubernetes_processor.md b/doc/kubernetes_processor.md index 47805ea..9cf4e3a 100644 --- a/doc/kubernetes_processor.md +++ b/doc/kubernetes_processor.md @@ -4,9 +4,32 @@ performs operations against a kubernetes cluster ## Fields -`operator` +### `deletion_propagation` -specifies the kubernetes client operation to perform +Specifies the [deletion propagation policy](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents) used with the `delete` operator. Type: `string` -Options: `create`, `delete`, `get`, `update` +Default: `Background` +Options: `Background`, `Foreground`, `Orphan` + +### `operator` + +Specifies the kubernetes client operation to perform. + +Type: `string` +Options: `create`, `delete`, `get`, `status`, `update` + +### `operator_mapping` + +A [Bloblang mapping](https://www.benthos.dev/docs/guides/bloblang/about/) that resolves to valid operator. + +Type: `string` + +### `parts[]` + +An optional array of message indexes of a batch that the processor should apply to. If left empty all messages are processed. This field is only applicable when batching messages at the input level. + +Indexes can be negative, and if so the part will be selected from the end counting backwards starting from -1. + +Type: `list(number)` +Default: `[]` diff --git a/doc/kubernetes_status_output.md b/doc/kubernetes_status_output.md index 3499509..e8bb8a3 100644 --- a/doc/kubernetes_status_output.md +++ b/doc/kubernetes_status_output.md @@ -3,33 +3,43 @@ updates a kubernetes object's status subresource **Examples** + ```yaml input: type: kubernetes plugin: watches: - group: example.com - version: v1alpha1 + version: v1 kind: Foo pipeline: processors: - bloblang: | + map status { + root = this + status.observedGeneration = metadata.generation + status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.000000000Z") + status.status = match { + metadata.exists("deletionTimestamp") => "Destroying" + _ => "Reconciling" + } + } root = match { meta().exists("deleted") => deleted() + _ => this.apply("status") } output: type: kubernetes_status plugin: {} - processors: - - bloblang: | - root = this - status.observedGeneration = metadata.generation - status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.000000000Z") - status.status = if metadata.exists("deletionTimestamp") { - "Destroying" - } else { - "Reconciling" - } -``` \ No newline at end of file +``` + +## Fields + +### `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + +Type: `number` +Default: `1` diff --git a/example/manifest.yml b/example/manifest.yml index 4ae7cf8..eeb595d 100644 --- a/example/manifest.yml +++ b/example/manifest.yml @@ -9,13 +9,3 @@ metadata: - random spec: size: 1 - ---- -apiVersion: example.com/v1 -kind: Bar -metadata: - name: three - labels: - color: blue -spec: - size: 3 diff --git a/example/status.yml b/example/status.yml index bdb6307..b8c225a 100644 --- a/example/status.yml +++ b/example/status.yml @@ -9,96 +9,61 @@ input: - group: example.com version: v1 kind: Bar + result: + requeue: meta().exists("requeue") + requeue_after: ${!meta("requeue_after").or("")} + processors: + - log: + message: ${!content()} pipeline: processors: - - bloblang: | - root = match { - meta().exists("deleted") => deleted() - } - -output: - switch: - outputs: - - condition: - bloblang: metadata.finalizers.or([]).contains("finalizer.foos.example.com") != true - output: - type: kubernetes - plugin: {} + - switch: + # ignore deleted items + - check: meta().exists("deleted") processors: - - bloblang: | - meta requeue_after = "1ms" - map finalizer { - root = this - metadata.finalizers = metadata.finalizers.append("finalizer.foos.example.com") - } - root = match { - metadata.finalizers.or([]).contains("finalizer.foos.example.com") => deleted() - _ => this.apply("finalizer") - } - - sync_response: {} - - log: - message: adding finalizer... - - - fallthrough: true - output: - type: kubernetes_status - plugin: {} - processors: - - bloblang: | - root = this - status.observedGeneration = metadata.generation - status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.000000000Z") - status.status = if metadata.exists("deletionTimestamp") { - "Destroying" - } else { - "Reconciling" - } - - log: - message: updating status... + - bloblang: root = deleted() - - output: - type: kubernetes - plugin: {} - processors: + # reconcile dependent resources + - processors: - branch: - request_map: | - root = { - "apiVersion": "example.com/v1", - "kind": "Bar", - "metadata": { - "name": metadata.name, - "namespace": metadata.namespace - } - } processors: + - bloblang: | + meta size = spec.size + apiVersion = "example.com/v1" + kind = "Bar" + metadata.labels = metadata.labels + metadata.name = metadata.name + metadata.namespace = metadata.namespace + metadata.ownerReferences = [{ + "apiVersion": apiVersion, + "kind": kind, + "controller": true, + "blockOwnerDeletion": true, + "name": metadata.name, + "uid": metadata.uid + }] - type: kubernetes plugin: operator: get + - bloblang: | + root = this + spec.size = meta("size").number() + - type: kubernetes + plugin: + operator_mapping: if errored() { "create" } else { "update" } result_map: | - root.bar = if errored() { - throw(error()) - } else { - this - } - - - log: - message: reconciling bar - + root.status.bar = metadata.uid + root.status.status = "Ready" - bloblang: | - let ownerRef = { - "apiVersion": apiVersion, - "controller": true, - "blockOwnerDeletion": true, - "kind": kind, - "name": metadata.name, - "uid": metadata.uid - } + root = this + meta requeue = if metadata.annotations.exists("x-requeue") { "true" } + meta requeue_after = metadata.annotations."x-requeue-after".or("") + - sync_response: {} - root = bar - root.metadata.labels = metadata.labels - root.metadata.ownerReferences = [$ownerRef] - root.spec.size = spec.size +output: + type: kubernetes_status + plugin: {} logger: - level: info + level: DEBUG diff --git a/go.mod b/go.mod index d7dffa2..ce56440 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cludden/benthos-kubernetes go 1.14 require ( - github.com/Jeffail/benthos/v3 v3.28.0 + github.com/Jeffail/benthos/v3 v3.29.0 github.com/go-logr/logr v0.1.0 github.com/opentracing/opentracing-go v1.2.0 k8s.io/apimachinery v0.18.2 diff --git a/go.sum b/go.sum index c979cf3..7e78b55 100644 --- a/go.sum +++ b/go.sum @@ -80,10 +80,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/ClickHouse/clickhouse-go v1.4.3 h1:iAFMa2UrQdR5bHJ2/yaSLffZkxpcOYQMCUuKeNXGdqc= github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/Jeffail/benthos/v3 v3.26.0 h1:us2kGGweSn2ZssOveyfM2Qzhwz0TL/yn6K9Inr3r6yk= -github.com/Jeffail/benthos/v3 v3.26.0/go.mod h1:iz55lfFSJUZ0SEHqgJGCi0sIehiY8YmR+wmk/rLF1Ic= -github.com/Jeffail/benthos/v3 v3.28.0 h1:slbPTS5mS/4usd53pq2VaJWFGeONKRDq3ps5C+JNmWc= -github.com/Jeffail/benthos/v3 v3.28.0/go.mod h1:1fFh2ASs5qXNhk7AI/IrwDZpeIT211PvubJR2wCPlhM= +github.com/Jeffail/benthos/v3 v3.29.0 h1:jc7rYRyf7Rw2Y2Fnm63k5nwQGPsu3hTRSdJ6pim3ZuE= +github.com/Jeffail/benthos/v3 v3.29.0/go.mod h1:1fFh2ASs5qXNhk7AI/IrwDZpeIT211PvubJR2wCPlhM= github.com/Jeffail/gabs/v2 v2.6.0 h1:WdCnGaDhNa4LSRTMwhLZzJ7SRDXjABNP13SOKvCpL5w= github.com/Jeffail/gabs/v2 v2.6.0/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= @@ -150,6 +148,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk= github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/blang/semver v3.5.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= @@ -248,6 +247,7 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M= github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= @@ -478,6 +478,7 @@ github.com/itchyny/gojq v0.10.0 h1:JHq5ZIQKYZgK8bw7HWqF6mqjg7NXyiIhUlSwip6jyQc= github.com/itchyny/gojq v0.10.0/go.mod h1:dJzXXNL1A+1rjDF8tDTzW5vOe4i9iIkKSH21HxV76Sw= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jhump/protoreflect v1.7.0 h1:qJ7piXPrjP3mDrfHf5ATkxfLix8ANs226vpo0aACOn0= github.com/jhump/protoreflect v1.7.0/go.mod h1:RZkzh7Hi9J7qT/sPlWnJ/UwZqCJvciFxKDA0UCeltSM= @@ -524,6 +525,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/strftime v1.0.1 h1:o7qz5pmLzPDLyGW4lG6JvTKPUfTFXwe+vOamIYWtnVU= github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g= @@ -786,6 +788,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw= diff --git a/input/kubernetes.go b/input/kubernetes.go index c87f9e3..49158d3 100644 --- a/input/kubernetes.go +++ b/input/kubernetes.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/Jeffail/benthos/v3/lib/bloblang" "github.com/Jeffail/benthos/v3/lib/input" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message" @@ -56,12 +57,26 @@ This plugin streams changes to kubernetes objects from a given cluster.`, // KubernetesConfig defines runtime configuration for a kubernetes input type KubernetesConfig struct { - Watches []Watch `json:"watches,omitempty" yaml:"watches,omitempty"` + Result KubernetesResultConfig `json:"result" yaml:"result"` + Watches []Watch `json:"watches,omitempty" yaml:"watches,omitempty"` } // NewKubernetesConfig creates a new KubernetesConfig with default values func NewKubernetesConfig() *KubernetesConfig { - return &KubernetesConfig{} + return &KubernetesConfig{ + Result: NewKubernetesResultConfig(), + } +} + +// KubernetesResultConfig provides config fields for customing the result +type KubernetesResultConfig struct { + Requeue string `json:"requeue" yaml:"requeue"` + RequeueAfter string `json:"requeue_after" yaml:"requeue_after"` +} + +// NewKubernetesResultConfig returns a KubernetesResultConfig with default values +func NewKubernetesResultConfig() KubernetesResultConfig { + return KubernetesResultConfig{} } // Watch defines a controller configuration @@ -204,6 +219,9 @@ type selectorRequirement struct { type Kubernetes struct { mgr manager.Manager + requeue bloblang.Mapping + requeueAfter bloblang.Field + resChan chan types.Response transactionsChan chan types.Transaction @@ -234,6 +252,24 @@ func NewKubernetes( closedChan: make(chan struct{}), } + // check for result requeue mapping + if conf.Result.Requeue != "" { + requeue, err := bloblang.NewMapping(conf.Result.Requeue) + if err != nil { + return nil, fmt.Errorf("error parsing result requeue mapping: %v", err) + } + c.requeue = requeue + } + + // check for result requeue after expression + if conf.Result.RequeueAfter != "" { + requeueAfter, err := bloblang.NewField(conf.Result.RequeueAfter) + if err != nil { + return nil, fmt.Errorf("error parsing result requeue_after expression: %v", err) + } + c.requeueAfter = requeueAfter + } + // initalize controller manager cmgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { @@ -358,10 +394,27 @@ func (k *Kubernetes) Reconciler(gvk schema.GroupVersionKind) reconcile.Reconcile case <-k.closeChan: } + // combine result messages if more than one exist + result := message.New(nil) for _, resMsg := range store.Get() { resMsg.Iter(func(i int, part types.Part) error { - // check for requeue after metadata attribute - requeueAfter := part.Metadata().Get("requeue_after") + result.Append(part) + return nil + }) + } + + if result.Len() > 0 { + if k.requeue != nil { + requeue, err := k.requeue.QueryPart(0, result) + if err != nil { + log.Errorf("failed to check result requeue mapping: %v", err) + } else if requeue { + log.Debugln("requeueing object") + resp.Requeue = true + } + } + if k.requeueAfter != nil { + requeueAfter := k.requeueAfter.String(0, result) if requeueAfter != "" { requeueAfterDur, err := time.ParseDuration(requeueAfter) if err != nil { @@ -371,8 +424,7 @@ func (k *Kubernetes) Reconciler(gvk schema.GroupVersionKind) reconcile.Reconcile resp.RequeueAfter = requeueAfterDur } } - return nil - }) + } } return resp, nil diff --git a/processor/kubernetes.go b/processor/kubernetes.go index 9fb7468..a9738b8 100644 --- a/processor/kubernetes.go +++ b/processor/kubernetes.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/Jeffail/benthos/v3/lib/bloblang" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/metrics" "github.com/Jeffail/benthos/v3/lib/processor" @@ -49,8 +50,9 @@ func init() { // KubernetesConfig defines runtime configuration for a Kubernetes processor type KubernetesConfig struct { - Operator string `json:"operator" yaml:"operator"` DeletionPropagation metav1.DeletionPropagation `json:"deletion_propagation" yaml:"deletion_propagation"` + Operator string `json:"operator" yaml:"operator"` + OperatorMapping string `json:"operator_mapping" yaml:"operator_mapping"` Parts []int `json:"parts" yaml:"parts"` } @@ -70,6 +72,7 @@ type Kubernetes struct { deletionPropagation metav1.DeletionPropagation operator string + operatorMapping bloblang.Mapping parts []int log log.Modular @@ -97,6 +100,14 @@ func NewKubernetes( return nil, fmt.Errorf("invalid deletion propagation policy: %s", k.deletionPropagation) } + if conf.OperatorMapping != "" { + m, err := bloblang.NewMapping(conf.OperatorMapping) + if err != nil { + return nil, fmt.Errorf("error parsing operator field: %v", err) + } + k.operatorMapping = m + } + // initalize controller manager client, err := client.New(config.GetConfigOrDie(), client.Options{}) if err != nil { @@ -104,12 +115,6 @@ func NewKubernetes( } k.client = client - switch k.operator { - case "get", "create", "update", "delete", "status": - default: - return nil, fmt.Errorf("unsupported operator: %s", k.operator) - } - return k, nil } @@ -120,14 +125,25 @@ func (k *Kubernetes) ProcessMessage(msg types.Message) ([]types.Message, types.R ctx := context.Background() proc := func(index int, span opentracing.Span, part types.Part) error { + var err error var u unstructured.Unstructured if err := u.UnmarshalJSON(part.Get()); err != nil { return fmt.Errorf("invalid message part, must be valid kubernetes runtime object: %v", err) } + id := fmt.Sprintf("%s Namespace=%s Name=%s", u.GetObjectKind().GroupVersionKind().String(), u.GetNamespace(), u.GetName()) - var err error - switch k.operator { + operator := k.operator + if k.operatorMapping != nil { + operatorB, err := k.operatorMapping.MapPart(index, msg) + if err != nil { + return fmt.Errorf("error evaluating operator mapping: %v", err) + } + operator = string(operatorB.Get()) + } + + switch operator { case "get": + k.log.Debugf("getting kubernetes object: %s", id) key, perr := client.ObjectKeyFromObject(&u) if err != nil { err = fmt.Errorf("failed to get object: failed to get object key from object: %v", perr) @@ -137,14 +153,17 @@ func (k *Kubernetes) ProcessMessage(msg types.Message) ([]types.Message, types.R err = fmt.Errorf("failed to get object: %v", err) } case "create": + k.log.Debugf("creating kubernetes object: %s", id) if err = k.client.Create(ctx, &u); err != nil { err = fmt.Errorf("failed to create object: %v", err) } case "update": + k.log.Debugf("updating kubernetes object: %s", id) if err = k.client.Update(ctx, &u); err != nil { err = fmt.Errorf("failed to update object: %v", err) } case "delete": + k.log.Debugf("deleting kubernetes object: %s", id) var opts []client.DeleteOption policy := k.deletionPropagation @@ -165,10 +184,15 @@ func (k *Kubernetes) ProcessMessage(msg types.Message) ([]types.Message, types.R err = fmt.Errorf("failed to delete object: %v", err) } case "status": + k.log.Debugf("updating kubernetes object status: %s", id) if err = k.client.Status().Update(ctx, &u); err != nil { err = fmt.Errorf("failed to update object status: %v", err) } + default: + k.log.Errorf("unsupported operator: %s", operator) + return fmt.Errorf("unsupported operator: %s", operator) } + if err != nil { k.log.Errorf("failed to process message: %v", err) return err diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/combinators.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/combinators.go index 2590677..93e87dd 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/combinators.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/combinators.go @@ -17,158 +17,120 @@ type Result struct { Remaining []rune } -// Type is a general parser method. -type Type func([]rune) Result +// Func is the common signature of a parser function. +type Func func([]rune) Result //------------------------------------------------------------------------------ -// NotEnd parses zero characters from an input and expects it to not have ended. -// An ExpectedError must be provided which provides the error returned on empty -// input. -func NotEnd(p Type, exp ...string) Type { - return func(input []rune) Result { - if len(input) == 0 { - return Result{ - Payload: nil, - Err: NewError(input, exp...), - Remaining: input, - } - } - return p(input) +// Success creates a result with a payload from successful parsing. +func Success(payload interface{}, remaining []rune) Result { + return Result{ + Payload: payload, + Remaining: remaining, + } +} + +// Fail creates a result with an error from failed parsing. +func Fail(err *Error, input []rune) Result { + return Result{ + Err: err, + Remaining: input, } } +//------------------------------------------------------------------------------ + // Char parses a single character and expects it to match one candidate. -func Char(c rune) Type { - exp := string(c) - return NotEnd(func(input []rune) Result { - if input[0] != c { - return Result{ - Payload: nil, - Err: NewError(input, exp), - Remaining: input, - } - } - return Result{ - Payload: string(c), - Err: nil, - Remaining: input[1:], +func Char(c rune) Func { + return func(input []rune) Result { + if len(input) == 0 || input[0] != c { + return Fail(NewError(input, string(c)), input) } - }, exp) + return Success(string(c), input[1:]) + } } // NotChar parses any number of characters until they match a single candidate. -func NotChar(c rune) Type { +func NotChar(c rune) Func { exp := "not " + string(c) - return NotEnd(func(input []rune) Result { - if input[0] == c { - return Result{ - Payload: nil, - Err: NewError(input, exp), - Remaining: input, - } + return func(input []rune) Result { + if len(input) == 0 || input[0] == c { + return Fail(NewError(input, exp), input) } i := 0 for ; i < len(input); i++ { if input[i] == c { - return Result{ - Payload: string(input[:i]), - Err: nil, - Remaining: input[i:], - } + return Success(string(input[:i]), input[i:]) } } - return Result{ - Payload: string(input), - Err: nil, - Remaining: nil, - } - }, exp) + return Success(string(input), nil) + } } // InSet parses any number of characters within a set of runes. -func InSet(set ...rune) Type { +func InSet(set ...rune) Func { setMap := make(map[rune]struct{}, len(set)) for _, r := range set { setMap[r] = struct{}{} } exp := fmt.Sprintf("chars(%v)", string(set)) - return NotEnd(func(input []rune) Result { + return func(input []rune) Result { + if len(input) == 0 { + return Fail(NewError(input, exp), input) + } i := 0 for ; i < len(input); i++ { if _, exists := setMap[input[i]]; !exists { if i == 0 { - return Result{ - Err: NewError(input, exp), - Remaining: input, - } + return Fail(NewError(input, exp), input) } break } } - - return Result{ - Payload: string(input[:i]), - Err: nil, - Remaining: input[i:], - } - }, exp) + return Success(string(input[:i]), input[i:]) + } } // InRange parses any number of characters between two runes inclusive. -func InRange(lower, upper rune) Type { +func InRange(lower, upper rune) Func { exp := fmt.Sprintf("range(%c - %c)", lower, upper) - return NotEnd(func(input []rune) Result { + return func(input []rune) Result { + if len(input) == 0 { + return Fail(NewError(input, exp), input) + } i := 0 for ; i < len(input); i++ { if input[i] < lower || input[i] > upper { if i == 0 { - return Result{ - Err: NewError(input, exp), - Remaining: input, - } + return Fail(NewError(input, exp), input) } break } } - - return Result{ - Payload: string(input[:i]), - Err: nil, - Remaining: input[i:], - } - }, exp) + return Success(string(input[:i]), input[i:]) + } } // SpacesAndTabs parses any number of space or tab characters. -func SpacesAndTabs() Type { +func SpacesAndTabs() Func { return Expect(InSet(' ', '\t'), "whitespace") } // Term parses a single instance of a string. -func Term(str string) Type { - exp := str - return NotEnd(func(input []rune) Result { - for i, c := range str { +func Term(term string) Func { + return func(input []rune) Result { + for i, c := range term { if len(input) <= i || input[i] != c { - return Result{ - Payload: nil, - Err: NewError(input, exp), - Remaining: input, - } + return Fail(NewError(input, term), input) } } - return Result{ - Payload: str, - Err: nil, - Remaining: input[len(str):], - } - }, exp) + return Success(term, input[len(term):]) + } } // Number parses any number of numerical characters into either an int64 or, if // the number contains float characters, a float64. -func Number() Type { +func Number() Func { digitSet := InSet([]rune("0123456789")...) dot := Char('.') minus := Char('-') @@ -192,13 +154,8 @@ func Number() Type { if strings.Contains(resStr, ".") { f, err := strconv.ParseFloat(resStr, 64) if err != nil { - return Result{ - Err: NewFatalError( - input, - fmt.Errorf("failed to parse '%v' as float: %v", resStr, err), - ), - Remaining: input, - } + err = fmt.Errorf("failed to parse '%v' as float: %v", resStr, err) + return Fail(NewFatalError(input, err), input) } if negative { f = -f @@ -207,13 +164,8 @@ func Number() Type { } else { i, err := strconv.ParseInt(resStr, 10, 64) if err != nil { - return Result{ - Err: NewFatalError( - input, - fmt.Errorf("failed to parse '%v' as integer: %v", resStr, err), - ), - Remaining: input, - } + err = fmt.Errorf("failed to parse '%v' as integer: %v", resStr, err) + return Fail(NewFatalError(input, err), input) } if negative { i = -i @@ -225,7 +177,7 @@ func Number() Type { } // Boolean parses either 'true' or 'false' into a boolean value. -func Boolean() Type { +func Boolean() Func { parser := Expect(OneOf(Term("true"), Term("false")), "boolean") return func(input []rune) Result { res := parser(input) @@ -237,7 +189,7 @@ func Boolean() Type { } // Null parses a null literal value. -func Null() Type { +func Null() Func { nullMatch := Term("null") return func(input []rune) Result { res := nullMatch(input) @@ -249,7 +201,7 @@ func Null() Type { } // Array parses an array literal. -func Array() Type { +func Array() Func { open, comma, close := Char('['), Char(','), Char(']') whitespace := DiscardAll( OneOf( @@ -279,7 +231,7 @@ func Array() Type { } // Object parses an object literal. -func Object() Type { +func Object() Func { open, comma, close := Char('{'), Char(','), Char('}') whitespace := DiscardAll( OneOf( @@ -329,7 +281,7 @@ func Object() Type { // LiteralValue parses a literal bool, number, quoted string, null value, array // of literal values, or object. -func LiteralValue() Type { +func LiteralValue() Func { return OneOf( Boolean(), Number(), @@ -346,7 +298,7 @@ func LiteralValue() Type { // // Warning! If the result is not a []interface{}, or if an element is not a // string, then this parser returns a zero value instead. -func JoinStringPayloads(p Type) Type { +func JoinStringPayloads(p Func) Func { return func(input []rune) Result { res := p(input) if res.Err != nil { @@ -366,7 +318,7 @@ func JoinStringPayloads(p Type) Type { } // Comment parses a # comment (always followed by a line break). -func Comment() Type { +func Comment() Func { p := JoinStringPayloads( Sequence( Char('#'), @@ -384,7 +336,7 @@ func Comment() Type { // SnakeCase parses any number of characters of a camel case string. This parser // is very strict and does not support double underscores, prefix or suffix // underscores. -func SnakeCase() Type { +func SnakeCase() Func { return Expect(JoinStringPayloads(UntilFail(OneOf( InRange('a', 'z'), InRange('0', '9'), @@ -394,61 +346,44 @@ func SnakeCase() Type { // TripleQuoteString parses a single instance of a triple-quoted multiple line // string. The result is the inner contents. -func TripleQuoteString() Type { - exp := "quoted string" - return NotEnd(func(input []rune) Result { +func TripleQuoteString() Func { + return func(input []rune) Result { if len(input) < 6 || input[0] != '"' || input[1] != '"' || input[2] != '"' { - return Result{ - Err: NewError(input, exp), - Remaining: input, - } + return Fail(NewError(input, "quoted string"), input) } for i := 2; i < len(input)-2; i++ { if input[i] == '"' && input[i+1] == '"' && input[i+2] == '"' { - return Result{ - Payload: string(input[3:i]), - Remaining: input[i+3:], - } + return Success(string(input[3:i]), input[i+3:]) } } - return Result{ - Err: NewFatalError(input[len(input):], errors.New("required"), "end triple-quote"), - Remaining: input, - } - }, exp) + return Fail(NewFatalError(input[len(input):], errors.New("required"), "end triple-quote"), input) + } } // QuotedString parses a single instance of a quoted string. The result is the // inner contents unescaped. -func QuotedString() Type { - exp := "quoted string" - return NotEnd(func(input []rune) Result { - if input[0] != '"' { - return Result{ - Payload: nil, - Err: NewError(input, exp), - Remaining: input, - } +func QuotedString() Func { + return func(input []rune) Result { + if len(input) == 0 || input[0] != '"' { + return Fail(NewError(input, "quoted string"), input) } escaped := false for i := 1; i < len(input); i++ { if input[i] == '"' && !escaped { unquoted, err := strconv.Unquote(string(input[:i+1])) if err != nil { - return Result{ - Err: NewFatalError(input, fmt.Errorf("failed to unescape quoted string contents: %v", err)), - Remaining: input, - } - } - return Result{ - Payload: unquoted, - Remaining: input[i+1:], + err = fmt.Errorf("failed to unescape quoted string contents: %v", err) + return Fail(NewFatalError(input, err), input) } + return Success(unquoted, input[i+1:]) + } + if input[i] == '\n' { + Fail(NewFatalError(input[i:], errors.New("required"), "end quote"), input) } if input[i] == '\\' { escaped = !escaped @@ -456,27 +391,24 @@ func QuotedString() Type { escaped = false } } - return Result{ - Err: NewFatalError(input[len(input):], errors.New("required"), "end quote"), - Remaining: input, - } - }, exp) + return Fail(NewFatalError(input[len(input):], errors.New("required"), "end quote"), input) + } } // Newline parses a line break. -func Newline() Type { +func Newline() Func { return Expect(Char('\n'), "line break") } // NewlineAllowComment parses an optional comment followed by a mandatory line // break. -func NewlineAllowComment() Type { +func NewlineAllowComment() Func { return Expect(OneOf(Comment(), Char('\n')), "line break") } // UntilFail applies a parser until it fails, and returns a slice containing all // results. If the parser does not succeed at least once an error is returned. -func UntilFail(parser Type) Type { +func UntilFail(parser Func) Func { return func(input []rune) Result { res := parser(input) if res.Err != nil { @@ -485,10 +417,7 @@ func UntilFail(parser Type) Type { results := []interface{}{res.Payload} for { if res = parser(res.Remaining); res.Err != nil { - return Result{ - Payload: results, - Remaining: res.Remaining, - } + return Success(results, res.Remaining) } results = append(results, res.Payload) } @@ -507,9 +436,9 @@ func UntilFail(parser Type) Type { // to true then two slices are returned, the first element being a slice of // primary results and the second element being the delimiter results. func DelimitedPattern( - start, primary, delimiter, stop Type, + start, primary, delimiter, stop Func, allowTrailing, returnDelimiters bool, -) Type { +) Func { return func(input []rune) Result { res := start(input) if res.Err != nil { @@ -531,23 +460,19 @@ func DelimitedPattern( resStop.Payload = mkRes() return resStop } - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } results = append(results, res.Payload) for { if res = delimiter(res.Remaining); res.Err != nil { - if resStop := stop(res.Remaining); resStop.Err == nil { + resStop := stop(res.Remaining) + if resStop.Err == nil { resStop.Payload = mkRes() return resStop } - return Result{ - Err: res.Err, - Remaining: input, - } + res.Err.Add(resStop.Err) + return Fail(res.Err, input) } delims = append(delims, res.Payload) if res = primary(res.Remaining); res.Err != nil { @@ -557,10 +482,7 @@ func DelimitedPattern( return resStop } } - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } results = append(results, res.Payload) } @@ -573,7 +495,7 @@ func DelimitedPattern( // // Two slices are returned, the first element being a slice of primary results // and the second element being the delimiter results. -func Delimited(primary, delimiter Type) Type { +func Delimited(primary, delimiter Func) Func { return func(input []rune) Result { results := []interface{}{} delims := []interface{}{} @@ -586,19 +508,13 @@ func Delimited(primary, delimiter Type) Type { for { if res = delimiter(res.Remaining); res.Err != nil { - return Result{ - Payload: []interface{}{ - results, delims, - }, - Remaining: res.Remaining, - } + return Success([]interface{}{ + results, delims, + }, res.Remaining) } delims = append(delims, res.Payload) if res = primary(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } results = append(results, res.Payload) } @@ -607,7 +523,7 @@ func Delimited(primary, delimiter Type) Type { // Sequence applies a sequence of parsers and returns either a slice of the // results or an error if any parser fails. -func Sequence(parsers ...Type) Type { +func Sequence(parsers ...Func) Func { return func(input []rune) Result { results := make([]interface{}, 0, len(parsers)) res := Result{ @@ -615,24 +531,18 @@ func Sequence(parsers ...Type) Type { } for _, p := range parsers { if res = p(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } results = append(results, res.Payload) } - return Result{ - Payload: results, - Remaining: res.Remaining, - } + return Success(results, res.Remaining) } } // Optional applies a child parser and if it returns an ExpectedError then it is // cleared and a nil result is returned instead. Any other form of error will be // returned unchanged. -func Optional(parser Type) Type { +func Optional(parser Func) Func { return func(input []rune) Result { res := parser(input) if res.Err != nil && !res.Err.IsFatal() { @@ -644,7 +554,7 @@ func Optional(parser Type) Type { // Discard the result of a child parser, regardless of the result. This has the // effect of running the parser and returning only Remaining. -func Discard(parser Type) Type { +func Discard(parser Func) Func { return func(input []rune) Result { res := parser(input) res.Payload = nil @@ -655,7 +565,7 @@ func Discard(parser Type) Type { // DiscardAll the results of a child parser, applied until it fails. This has // the effect of running the parser and returning only Remaining. -func DiscardAll(parser Type) Type { +func DiscardAll(parser Func) Func { return func(input []rune) Result { res := parser(input) for res.Err == nil { @@ -669,7 +579,7 @@ func DiscardAll(parser Type) Type { // MustBe applies a parser and if the result is a non-fatal error then it is // upgraded to a fatal one. -func MustBe(parser Type) Type { +func MustBe(parser Func) Func { return func(input []rune) Result { res := parser(input) if res.Err != nil && !res.Err.IsFatal() { @@ -681,7 +591,7 @@ func MustBe(parser Type) Type { // Expect applies a parser and if an error is returned the list of expected candidates is replaced with the given // strings. This is useful for providing better context to users. -func Expect(parser Type, expected ...string) Type { +func Expect(parser Func, expected ...string) Func { return func(input []rune) Result { res := parser(input) if res.Err != nil && !res.Err.IsFatal() { @@ -694,7 +604,7 @@ func Expect(parser Type, expected ...string) Type { // OneOf accepts one or more parsers and tries them in order against an input. // If a parser returns an ExpectedError then the next parser is tried and so // on. Otherwise, the result is returned. -func OneOf(parsers ...Type) Type { +func OneOf(parsers ...Func) Func { return func(input []rune) Result { var err *Error tryParsers: @@ -713,10 +623,7 @@ func OneOf(parsers ...Type) Type { } return res } - return Result{ - Err: err, - Remaining: input, - } + return Fail(err, input) } } @@ -759,7 +666,7 @@ func bestMatch(left, right Result) (Result, bool) { // 'aaaa', if the input 'aaab' were provided then an error from parser B would // be returned, as although the input didn't match, it matched more of parser B // than parser A. -func BestMatch(parsers ...Type) Type { +func BestMatch(parsers ...Func) Func { if len(parsers) == 1 { return parsers[0] } diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/field_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/field_parser.go index 40147ec..fdf3283 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/field_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/field_parser.go @@ -9,7 +9,7 @@ import ( //------------------------------------------------------------------------------ -func intoStaticResolver(p Type) Type { +func intoStaticResolver(p Func) Func { return func(input []rune) Result { res := p(input) if str, ok := res.Payload.(string); ok { @@ -21,10 +21,7 @@ func intoStaticResolver(p Type) Type { func aFunction(input []rune) Result { if len(input) < 3 || input[0] != '$' || input[1] != '{' || input[2] != '!' { - return Result{ - Err: NewError(input, "${!"), - Remaining: input, - } + return Fail(NewError(input, "${!"), input) } i := 3 for ; i < len(input); i++ { @@ -33,10 +30,7 @@ func aFunction(input []rune) Result { if res.Err == nil { if len(res.Remaining) > 0 { pos := len(input[3:i]) - len(res.Remaining) - return Result{ - Err: NewFatalError(input[3+pos:], errors.New("required"), "end of expression"), - Remaining: input, - } + return Fail(NewFatalError(input[3+pos:], errors.New("required"), "end of expression"), input) } res.Remaining = input[i+1:] res.Payload = field.NewQueryResolver(res.Payload.(query.Function)) @@ -51,35 +45,20 @@ func aFunction(input []rune) Result { return res } } - return Result{ - Payload: field.StaticResolver(string(input)), - Err: nil, - Remaining: nil, - } + return Success(field.StaticResolver(string(input)), nil) } func escapedBlock(input []rune) Result { if len(input) < 4 || input[0] != '$' || input[1] != '{' || input[2] != '{' || input[3] != '!' { - return Result{ - Err: NewError(input, "${{!"), - Remaining: input, - } + return Fail(NewError(input, "${{!"), input) } i := 4 for ; i < len(input)-1; i++ { if input[i] == '}' && input[i+1] == '}' { - return Result{ - Payload: field.StaticResolver("${!" + string(input[4:i]) + "}"), - Err: nil, - Remaining: input[i+2:], - } + return Success(field.StaticResolver("${!"+string(input[4:i])+"}"), input[i+2:]) } } - return Result{ - Payload: field.StaticResolver(string(input)), - Err: nil, - Remaining: nil, - } + return Success(field.StaticResolver(string(input)), nil) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/mapping_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/mapping_parser.go index 75d5d21..6706a48 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/mapping_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/mapping_parser.go @@ -35,7 +35,7 @@ func ParseMapping(filepath string, expr string) (*mapping.Executor, *Error) { //------------------------------------------------------------------------------' -func parseExecutor(baseDir string) Type { +func parseExecutor(baseDir string) Func { newline := NewlineAllowComment() whitespace := SpacesAndTabs() allWhitespace := DiscardAll(OneOf(whitespace, newline)) @@ -70,10 +70,7 @@ func parseExecutor(baseDir string) Type { } if res = newline(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } res = allWhitespace(res.Remaining) @@ -82,24 +79,17 @@ func parseExecutor(baseDir string) Type { } if res = statement(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } if mStmt, ok := res.Payload.(mapping.Statement); ok { statements = append(statements, mStmt) } } - - return Result{ - Remaining: res.Remaining, - Payload: mapping.NewExecutor(input, maps, statements...), - } + return Success(mapping.NewExecutor(input, maps, statements...), res.Remaining) } } -func singleRootMapping() Type { +func singleRootMapping() Func { whitespace := SpacesAndTabs() allWhitespace := DiscardAll(OneOf(whitespace, Newline())) @@ -114,24 +104,17 @@ func singleRootMapping() Type { // Remove all tailing whitespace and ensure no remaining input. res = allWhitespace(res.Remaining) if len(res.Remaining) > 0 { - return Result{ - Remaining: input, - Err: NewError(res.Remaining, "end of input"), - } + return Fail(NewError(res.Remaining, "end of input"), input) } stmt := mapping.NewStatement(input, mapping.NewJSONAssignment(), fn) - - return Result{ - Remaining: nil, - Payload: mapping.NewExecutor(input, map[string]query.Function{}, stmt), - } + return Success(mapping.NewExecutor(input, map[string]query.Function{}, stmt), nil) } } //------------------------------------------------------------------------------ -func varNameParser() Type { +func varNameParser() Func { return JoinStringPayloads( UntilFail( OneOf( @@ -145,7 +128,7 @@ func varNameParser() Type { ) } -func importParser(baseDir string, maps map[string]query.Function) Type { +func importParser(baseDir string, maps map[string]query.Function) Func { p := Sequence( Term("import"), SpacesAndTabs(), @@ -167,27 +150,19 @@ func importParser(baseDir string, maps map[string]query.Function) Type { filepath = path.Join(baseDir, filepath) contents, err := ioutil.ReadFile(filepath) if err != nil { - return Result{ - Err: NewFatalError(input, fmt.Errorf("failed to read import: %w", err)), - Remaining: input, - } + return Fail(NewFatalError(input, fmt.Errorf("failed to read import: %w", err)), input) } importContent := []rune(string(contents)) execRes := parseExecutor(path.Dir(filepath))(importContent) if execRes.Err != nil { - return Result{ - Err: NewFatalError(input, NewImportError(filepath, importContent, execRes.Err)), - Remaining: input, - } + return Fail(NewFatalError(input, NewImportError(filepath, importContent, execRes.Err)), input) } exec := execRes.Payload.(*mapping.Executor) if len(exec.Maps()) == 0 { - return Result{ - Err: NewFatalError(input, fmt.Errorf("no maps to import from '%v'", filepath)), - Remaining: input, - } + err := fmt.Errorf("no maps to import from '%v'", filepath) + return Fail(NewFatalError(input, err), input) } collisions := []string{} @@ -199,20 +174,15 @@ func importParser(baseDir string, maps map[string]query.Function) Type { } } if len(collisions) > 0 { - return Result{ - Err: NewFatalError(input, fmt.Errorf("map name collisions from import '%v': %v", filepath, collisions)), - Remaining: input, - } + err := fmt.Errorf("map name collisions from import '%v': %v", filepath, collisions) + return Fail(NewFatalError(input, err), input) } - return Result{ - Payload: filepath, - Remaining: res.Remaining, - } + return Success(filepath, res.Remaining) } } -func mapParser(maps map[string]query.Function) Type { +func mapParser(maps map[string]query.Function) Func { newline := NewlineAllowComment() whitespace := SpacesAndTabs() allWhitespace := DiscardAll(OneOf(whitespace, newline)) @@ -265,10 +235,7 @@ func mapParser(maps map[string]query.Function) Type { stmtSlice := seqSlice[4].([]interface{}) if _, exists := maps[ident]; exists { - return Result{ - Err: NewFatalError(input, fmt.Errorf("map name collision: %v", ident)), - Remaining: input, - } + return Fail(NewFatalError(input, fmt.Errorf("map name collision: %v", ident)), input) } statements := make([]mapping.Statement, len(stmtSlice)) @@ -278,14 +245,11 @@ func mapParser(maps map[string]query.Function) Type { maps[ident] = mapping.NewExecutor(input, maps, statements...) - return Result{ - Payload: ident, - Remaining: res.Remaining, - } + return Success(ident, res.Remaining) } } -func letStatementParser() Type { +func letStatementParser() Func { p := Sequence( Expect(Term("let"), "assignment"), SpacesAndTabs(), @@ -311,18 +275,18 @@ func letStatementParser() Type { return res } resSlice := res.Payload.([]interface{}) - return Result{ - Payload: mapping.NewStatement( + return Success( + mapping.NewStatement( input, mapping.NewVarAssignment(resSlice[2].(string)), resSlice[6].(query.Function), ), - Remaining: res.Remaining, - } + res.Remaining, + ) } } -func nameLiteralParser() Type { +func nameLiteralParser() Func { return JoinStringPayloads( UntilFail( OneOf( @@ -339,7 +303,7 @@ func nameLiteralParser() Type { ) } -func metaStatementParser(disabled bool) Type { +func metaStatementParser(disabled bool) Func { p := Sequence( Expect(Term("meta"), "assignment"), SpacesAndTabs(), @@ -359,10 +323,10 @@ func metaStatementParser(disabled bool) Type { return res } if disabled { - return Result{ - Err: NewFatalError(input, errors.New("setting meta fields from within a map is not allowed")), - Remaining: input, - } + return Fail( + NewFatalError(input, errors.New("setting meta fields from within a map is not allowed")), + input, + ) } resSlice := res.Payload.([]interface{}) @@ -371,18 +335,18 @@ func metaStatementParser(disabled bool) Type { keyPtr = &key } - return Result{ - Payload: mapping.NewStatement( + return Success( + mapping.NewStatement( input, mapping.NewMetaAssignment(keyPtr), resSlice[6].(query.Function), ), - Remaining: res.Remaining, - } + res.Remaining, + ) } } -func pathLiteralSegmentParser() Type { +func pathLiteralSegmentParser() Func { return JoinStringPayloads( UntilFail( OneOf( @@ -398,7 +362,7 @@ func pathLiteralSegmentParser() Type { ) } -func quotedPathLiteralSegmentParser() Type { +func quotedPathLiteralSegmentParser() Func { pattern := QuotedString() return func(input []rune) Result { @@ -413,14 +377,11 @@ func quotedPathLiteralSegmentParser() Type { rawSegment = strings.Replace(rawSegment, "~", "~0", -1) rawSegment = strings.Replace(rawSegment, ".", "~1", -1) - return Result{ - Payload: rawSegment, - Remaining: res.Remaining, - } + return Success(rawSegment, res.Remaining) } } -func pathParser() Type { +func pathParser() Func { p := Sequence( Expect(pathLiteralSegmentParser(), "assignment"), Optional( @@ -456,14 +417,11 @@ func pathParser() Type { } } - return Result{ - Payload: path, - Remaining: res.Remaining, - } + return Success(path, res.Remaining) } } -func plainMappingStatementParser() Type { +func plainMappingStatementParser() Func { p := Sequence( pathParser(), SpacesAndTabs(), @@ -484,14 +442,14 @@ func plainMappingStatementParser() Type { path = path[1:] } - return Result{ - Payload: mapping.NewStatement( + return Success( + mapping.NewStatement( input, mapping.NewJSONAssignment(path...), resSlice[4].(query.Function), ), - Remaining: res.Remaining, - } + res.Remaining, + ) } } diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_arithmetic_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_arithmetic_parser.go index 67a0279..cd6ab01 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_arithmetic_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_arithmetic_parser.go @@ -8,7 +8,7 @@ import ( //------------------------------------------------------------------------------ -func arithmeticOpParser() Type { +func arithmeticOpParser() Func { opParser := OneOf( Char('+'), Char('-'), @@ -60,16 +60,16 @@ func arithmeticOpParser() Type { case "|": res.Payload = query.ArithmeticPipe default: - return Result{ - Remaining: input, - Err: NewFatalError(input, fmt.Errorf("operator not recognized: %v", res.Payload)), - } + return Fail( + NewFatalError(input, fmt.Errorf("operator not recognized: %v", res.Payload)), + input, + ) } return res } } -func arithmeticParser(fnParser Type) Type { +func arithmeticParser(fnParser Func) Func { whitespace := DiscardAll( OneOf( SpacesAndTabs(), @@ -112,10 +112,10 @@ func arithmeticParser(fnParser Type) Type { query.ArithmeticSub, }, ); err != nil { - return Result{ - Err: NewFatalError(input, err), - Remaining: input, - } + return Fail( + NewFatalError(input, err), + input, + ) } } fns = append(fns, fn) @@ -126,15 +126,12 @@ func arithmeticParser(fnParser Type) Type { fn, err := query.NewArithmeticExpression(fns, ops) if err != nil { - return Result{ - Err: NewFatalError(input, err), - Remaining: input, - } - } - return Result{ - Payload: fn, - Remaining: res.Remaining, + return Fail( + NewFatalError(input, err), + input, + ) } + return Success(fn, res.Remaining) } } diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_expression_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_expression_parser.go index 5ecaf62..8f3864e 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_expression_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_expression_parser.go @@ -4,7 +4,7 @@ import ( "github.com/Jeffail/benthos/v3/internal/bloblang/query" ) -func matchCaseParser() Type { +func matchCaseParser() Func { whitespace := SpacesAndTabs() p := Sequence( @@ -56,14 +56,14 @@ func matchCaseParser() Type { caseFn = query.NewLiteralFunction(true) } - return Result{ - Payload: query.NewMatchCase(caseFn, seqSlice[2].(query.Function)), - Remaining: res.Remaining, - } + return Success( + query.NewMatchCase(caseFn, seqSlice[2].(query.Function)), + res.Remaining, + ) } } -func matchExpressionParser() Type { +func matchExpressionParser() Func { whitespace := DiscardAll( OneOf( SpacesAndTabs(), @@ -117,7 +117,7 @@ func matchExpressionParser() Type { } } -func ifExpressionParser() Type { +func ifExpressionParser() Func { optionalWhitespace := DiscardAll( OneOf( SpacesAndTabs(), @@ -168,7 +168,7 @@ func ifExpressionParser() Type { } } -func bracketsExpressionParser() Type { +func bracketsExpressionParser() Func { whitespace := DiscardAll( OneOf( SpacesAndTabs(), diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_function_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_function_parser.go index 6849cbd..677272a 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_function_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_function_parser.go @@ -8,7 +8,7 @@ import ( //------------------------------------------------------------------------------ -func functionArgsParser(allowFunctions bool) Type { +func functionArgsParser(allowFunctions bool) Func { open, comma, close := Char('('), Char(','), Char(')') whitespace := DiscardAll( OneOf( @@ -17,7 +17,7 @@ func functionArgsParser(allowFunctions bool) Type { ), ) - paramTypes := []Type{ + paramTypes := []Func{ parseLiteralWithTails(Boolean()), parseLiteralWithTails(Number()), parseLiteralWithTails(TripleQuoteString()), @@ -27,7 +27,7 @@ func functionArgsParser(allowFunctions bool) Type { return func(input []rune) Result { tmpParamTypes := paramTypes if allowFunctions { - tmpParamTypes = append([]Type{}, paramTypes...) + tmpParamTypes = append([]Func{}, paramTypes...) tmpParamTypes = append(tmpParamTypes, ParseQuery) } return DelimitedPattern( @@ -38,25 +38,31 @@ func functionArgsParser(allowFunctions bool) Type { ), "function arguments", ), - Expect( - MustBe(OneOf(tmpParamTypes...)), + MustBe(Expect( + OneOf(tmpParamTypes...), "function argument", - ), - Sequence( - Discard(SpacesAndTabs()), - comma, - whitespace, - ), - Sequence( - whitespace, - close, - ), + )), + MustBe(Expect( + Sequence( + Discard(SpacesAndTabs()), + comma, + whitespace, + ), + "comma", + )), + MustBe(Expect( + Sequence( + whitespace, + close, + ), + "closing bracket", + )), false, false, )(input) } } -func parseFunctionTail(fn query.Function) Type { +func parseFunctionTail(fn query.Function) Func { openBracket := Char('(') closeBracket := Char(')') @@ -92,7 +98,7 @@ func parseFunctionTail(fn query.Function) Type { } } -func parseLiteralWithTails(litParser Type) Type { +func parseLiteralWithTails(litParser Func) Func { delim := Sequence( Char('.'), Discard( @@ -117,26 +123,20 @@ func parseLiteralWithTails(litParser Type) Type { if fn != nil { payload = fn } - return Result{ - Payload: payload, - Remaining: res.Remaining, - } + return Success(payload, res.Remaining) } if fn == nil { fn = query.NewLiteralFunction(lit) } if res = MustBe(parseFunctionTail(fn))(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } fn = res.Payload.(query.Function) } } } -func parseWithTails(fnParser Type) Type { +func parseWithTails(fnParser Func) Func { delim := Sequence( Char('.'), Discard( @@ -169,23 +169,17 @@ func parseWithTails(fnParser Type) Type { if isNot { fn = query.Not(fn) } - return Result{ - Payload: fn, - Remaining: res.Remaining, - } + return Success(fn, res.Remaining) } if res = MustBe(parseFunctionTail(fn))(res.Remaining); res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } fn = res.Payload.(query.Function) } } } -func quotedPathSegmentParser() Type { +func quotedPathSegmentParser() Func { pattern := QuotedString() return func(input []rune) Result { @@ -200,14 +194,11 @@ func quotedPathSegmentParser() Type { rawSegment = strings.Replace(rawSegment, "~", "~0", -1) rawSegment = strings.Replace(rawSegment, ".", "~1", -1) - return Result{ - Payload: rawSegment, - Remaining: res.Remaining, - } + return Success(rawSegment, res.Remaining) } } -func fieldLiteralMapParser(ctxFn query.Function) Type { +func fieldLiteralMapParser(ctxFn query.Function) Func { fieldPathParser := Expect( OneOf( JoinStringPayloads( @@ -236,20 +227,14 @@ func fieldLiteralMapParser(ctxFn query.Function) Type { fn, err := query.NewGetMethod(ctxFn, res.Payload.(string)) if err != nil { - return Result{ - Remaining: input, - Err: NewFatalError(input, err), - } + return Fail(NewFatalError(input, err), input) } - return Result{ - Remaining: res.Remaining, - Payload: fn, - } + return Success(fn, res.Remaining) } } -func variableLiteralParser() Type { +func variableLiteralParser() Func { varPathParser := Expect( Sequence( Char('$'), @@ -277,14 +262,11 @@ func variableLiteralParser() Type { path := res.Payload.([]interface{})[1].(string) fn := query.NewVarFunction(path) - return Result{ - Remaining: res.Remaining, - Payload: fn, - } + return Success(fn, res.Remaining) } } -func fieldLiteralRootParser() Type { +func fieldLiteralRootParser() Func { fieldPathParser := Expect( JoinStringPayloads( UntilFail( @@ -317,20 +299,14 @@ func fieldLiteralRootParser() Type { fn = query.NewFieldFunction(path) } if err != nil { - return Result{ - Remaining: input, - Err: NewFatalError(input, err), - } + return Fail(NewFatalError(input, err), input) } - return Result{ - Remaining: res.Remaining, - Payload: fn, - } + return Success(fn, res.Remaining) } } -func methodParser(fn query.Function) Type { +func methodParser(fn query.Function) Func { p := Sequence( Expect( SnakeCase(), @@ -352,19 +328,13 @@ func methodParser(fn query.Function) Type { method, err := query.InitMethod(targetMethod, fn, args...) if err != nil { - return Result{ - Err: NewFatalError(input, err), - Remaining: input, - } - } - return Result{ - Payload: method, - Remaining: res.Remaining, + return Fail(NewFatalError(input, err), input) } + return Success(method, res.Remaining) } } -func functionParser() Type { +func functionParser() Func { p := Sequence( Expect( SnakeCase(), @@ -386,15 +356,9 @@ func functionParser() Type { fn, err := query.InitFunction(targetFunc, args...) if err != nil { - return Result{ - Err: NewFatalError(input, err), - Remaining: input, - } - } - return Result{ - Payload: fn, - Remaining: res.Remaining, + return Fail(NewFatalError(input, err), input) } + return Success(fn, res.Remaining) } } @@ -416,16 +380,9 @@ func parseDeprecatedFunction(input []rune) Result { fn, exists := query.DeprecatedFunction(targetFunc, arg) if !exists { - return Result{ - Err: NewError(input), - Remaining: input, - } - } - return Result{ - Payload: fn, - Err: nil, - Remaining: nil, + return Fail(NewError(input), input) } + return Success(fn, nil) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_literal_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_literal_parser.go index c4ce0b5..6048db0 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_literal_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_literal_parser.go @@ -4,7 +4,7 @@ import ( "github.com/Jeffail/benthos/v3/internal/bloblang/query" ) -func dynamicArrayParser() Type { +func dynamicArrayParser() Func { open, comma, close := Char('['), Char(','), Char(']') whitespace := DiscardAll( OneOf( @@ -42,7 +42,7 @@ func dynamicArrayParser() Type { } } -func dynamicObjectParser() Type { +func dynamicObjectParser() Func { open, comma, close := Char('{'), Char(','), Char('}') whitespace := DiscardAll( OneOf( @@ -103,7 +103,7 @@ func dynamicObjectParser() Type { } } -func dynamicLiteralValueParser() Type { +func dynamicLiteralValueParser() Func { return OneOf( Boolean(), Number(), @@ -115,7 +115,7 @@ func dynamicLiteralValueParser() Type { ) } -func literalValueParser() Type { +func literalValueParser() Func { p := dynamicLiteralValueParser() return func(input []rune) Result { diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_parser.go b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_parser.go index 4d07002..ff8f07e 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_parser.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/bloblang/parser/query_parser.go @@ -57,18 +57,12 @@ func ParseDeprecatedQuery(input []rune) Result { res = arithmeticParser(rootParser)(res.Remaining) if res.Err != nil { - return Result{ - Err: res.Err, - Remaining: input, - } + return Fail(res.Err, input) } result := res.Payload res = SpacesAndTabs()(res.Remaining) - return Result{ - Payload: result, - Remaining: res.Remaining, - } + return Success(result, res.Remaining) } func tryParseQuery(expr string, deprecated bool) (query.Function, *Error) { diff --git a/vendor/github.com/Jeffail/benthos/v3/internal/docs/component.go b/vendor/github.com/Jeffail/benthos/v3/internal/docs/component.go index bbf8602..7726e07 100644 --- a/vendor/github.com/Jeffail/benthos/v3/internal/docs/component.go +++ b/vendor/github.com/Jeffail/benthos/v3/internal/docs/component.go @@ -157,8 +157,9 @@ BETA: This component is experimental and therefore subject to change outside of major version releases. {{end -}} {{if .Deprecated -}} -DEPRECATED: This component is deprecated and will be removed in the next major -version release. Please consider moving onto [alternative components](#alternatives). +:::warning DEPRECATED +This component is deprecated and will be removed in the next major version release. Please consider moving onto [alternative components](#alternatives). +::: {{end -}} {{if gt (len .Summary) 0 -}} diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/bloblang/package.go b/vendor/github.com/Jeffail/benthos/v3/lib/bloblang/package.go new file mode 100644 index 0000000..75be4ac --- /dev/null +++ b/vendor/github.com/Jeffail/benthos/v3/lib/bloblang/package.go @@ -0,0 +1,104 @@ +package bloblang + +import ( + "github.com/Jeffail/benthos/v3/internal/bloblang/field" + "github.com/Jeffail/benthos/v3/internal/bloblang/mapping" + "github.com/Jeffail/benthos/v3/internal/bloblang/parser" + "github.com/Jeffail/benthos/v3/lib/types" +) + +// Message is an interface type to be given to a function interpolator, it +// allows the function to resolve fields and metadata from a message. +type Message interface { + Get(p int) types.Part + Len() int +} + +// Field represents a Benthos dynamic field expression, used to configure string +// fields where the contents should change based on the contents of messages and +// other factors. +// +// Each function here resolves the expression for a particular message of a +// batch, this is why an index is expected. +type Field interface { + // Bytes returns a byte slice representing the expression resolved for a + // message of a batch. + Bytes(index int, msg Message) []byte + + // String returns a string representing the expression resolved for a + // message of a batch. + String(index int, msg Message) string +} + +type fieldWrap struct { + f field.Expression +} + +func (w *fieldWrap) Bytes(index int, msg Message) []byte { + return w.f.Bytes(index, field.Message(msg)) +} + +func (w *fieldWrap) String(index int, msg Message) string { + return w.f.String(index, field.Message(msg)) +} + +// NewField attempts to parse and create a dynamic field expression from a +// string. If the expression is invalid an error is returned. +// +// When a parsing error occurs the returned error will be a *parser.Error type, +// which allows you to gain positional and structured error messages. +func NewField(expr string) (Field, error) { + e, err := parser.ParseField(expr) + if err != nil { + return nil, err + } + return &fieldWrap{e}, nil +} + +//------------------------------------------------------------------------------ + +// Mapping is a parsed Bloblang mapping. +type Mapping interface { + // QueryPart executes a Bloblang mapping and expects a boolean result, which + // is returned. If the execution fails or the result is not boolean an error + // is returned. + // + // Bloblang is able to query other messages of a batch, and therefore this + // function takes a message batch and index rather than a single message + // part argument. + QueryPart(index int, msg Message) (bool, error) + + // MapPart executes a Bloblang mapping on a message part and returns a new + // resulting part, or an error if the execution fails. + // + // Bloblang is able to query other messages of a batch, and therefore this + // function takes a message batch and index rather than a single message + // part argument. + MapPart(index int, msg Message) (types.Part, error) +} + +type mappingWrap struct { + e *mapping.Executor +} + +func (w *mappingWrap) QueryPart(index int, msg Message) (bool, error) { + return w.e.QueryPart(index, field.Message(msg)) +} + +func (w *mappingWrap) MapPart(index int, msg Message) (types.Part, error) { + return w.e.MapPart(index, field.Message(msg)) +} + +// NewMapping attempts to parse and create a Bloblang mapping from a string. If +// the mapping was read from a file the path should be provided in order to +// resolve relative imports, otherwise the path can be left empty. +// +// When a parsing error occurs the returned error may be a *parser.Error type, +// which allows you to gain positional and structured error messages. +func NewMapping(expr string) (Mapping, error) { + e, err := parser.ParseMapping("", expr) + if err != nil { + return nil, err + } + return &mappingWrap{e}, nil +} diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/broker.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/broker.go index 9fc0c9c..7c0846f 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/broker.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/broker.go @@ -123,12 +123,10 @@ type BrokerConfig struct { // NewBrokerConfig creates a new BrokerConfig with default values. func NewBrokerConfig() BrokerConfig { - batching := batch.NewPolicyConfig() - batching.Count = 1 return BrokerConfig{ Copies: 1, Inputs: brokerInputList{}, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/amqp_0_9.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/amqp_0_9.go index 632db0b..edeac1c 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/amqp_0_9.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/amqp_0_9.go @@ -50,8 +50,6 @@ type AMQP09Config struct { // NewAMQP09Config creates a new AMQP09Config with default values. func NewAMQP09Config() AMQP09Config { - batching := batch.NewPolicyConfig() - batching.Count = 1 return AMQP09Config{ URL: "amqp://guest:guest@localhost:5672/", Queue: "benthos-queue", @@ -63,7 +61,7 @@ func NewAMQP09Config() AMQP09Config { PrefetchCount: 10, PrefetchSize: 0, TLS: btls.NewConfig(), - Batching: batching, + Batching: batch.NewPolicyConfig(), BindingsDeclare: []AMQP09BindingConfig{}, } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/gcp_pubsub.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/gcp_pubsub.go index 28e42bb..ad7df8e 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/gcp_pubsub.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/gcp_pubsub.go @@ -30,15 +30,13 @@ type GCPPubSubConfig struct { // NewGCPPubSubConfig creates a new Config with default values. func NewGCPPubSubConfig() GCPPubSubConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return GCPPubSubConfig{ ProjectID: "", SubscriptionID: "", MaxOutstandingMessages: pubsub.DefaultReceiveSettings.MaxOutstandingMessages, MaxOutstandingBytes: pubsub.DefaultReceiveSettings.MaxOutstandingBytes, MaxBatchCount: 1, - Batching: batchConf, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka.go index f00dc92..520d063 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka.go @@ -42,8 +42,6 @@ type KafkaConfig struct { // NewKafkaConfig creates a new KafkaConfig with default values. func NewKafkaConfig() KafkaConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return KafkaConfig{ Addresses: []string{"localhost:9092"}, ClientID: "benthos_kafka_input", @@ -58,7 +56,7 @@ func NewKafkaConfig() KafkaConfig { MaxBatchCount: 1, TLS: btls.NewConfig(), SASL: sasl.NewConfig(), - Batching: batchConf, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_balanced.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_balanced.go index 0817bae..f3f0aff 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_balanced.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_balanced.go @@ -62,8 +62,6 @@ type KafkaBalancedConfig struct { // NewKafkaBalancedConfig creates a new KafkaBalancedConfig with default values. // TODO: V4 Remove this unused implementation. func NewKafkaBalancedConfig() KafkaBalancedConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return KafkaBalancedConfig{ Addresses: []string{"localhost:9092"}, ClientID: "benthos_kafka_input", @@ -75,7 +73,7 @@ func NewKafkaBalancedConfig() KafkaBalancedConfig { Topics: []string{"benthos_stream"}, StartFromOldest: true, TargetVersion: sarama.V1_0_0_0.String(), - Batching: batchConf, + Batching: batch.NewPolicyConfig(), MaxBatchCount: 1, TLS: btls.NewConfig(), SASL: sasl.NewConfig(), @@ -122,6 +120,9 @@ type KafkaBalanced struct { func NewKafkaBalanced( conf KafkaBalancedConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*KafkaBalanced, error) { + if conf.Batching.IsNoop() { + conf.Batching.Count = 1 + } k := KafkaBalanced{ conf: conf, stats: stats, diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_cg.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_cg.go index 208ded2..de2b466 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_cg.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kafka_cg.go @@ -59,6 +59,9 @@ type KafkaCG struct { func NewKafkaCG( conf KafkaBalancedConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*KafkaCG, error) { + if conf.Batching.IsNoop() { + conf.Batching.Count = 1 + } k := KafkaCG{ conf: conf, stats: stats, @@ -159,7 +162,16 @@ func (k *KafkaCG) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co latestOffset := claim.InitialOffset() batchPolicy, err := batch.NewPolicy(k.conf.Batching, k.mgr, k.log, k.stats) if err != nil { - return fmt.Errorf("failed to initialise batch policy: %v", err) + k.log.Errorf("Failed to initialise batch policy: %v, falling back to single messages.\n", err) + fallBackConf := batch.NewPolicyConfig() + fallBackConf.Count = 1 + if batchPolicy, err = batch.NewPolicy(fallBackConf, k.mgr, k.log, k.stats); err != nil { + k.log.Errorf("Failed to initialise fallback batch policy: %v.\n", err) + // The consume claim gets reopened immediately so let's try and + // avoid a busy loop (this should never happen anyway). + <-time.After(time.Second) + return err + } } defer batchPolicy.CloseAsync() diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis.go index 09ada4c..716cd2c 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis.go @@ -38,8 +38,6 @@ type KinesisConfig struct { // NewKinesisConfig creates a new Config with default values. func NewKinesisConfig() KinesisConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return KinesisConfig{ Config: sess.NewConfig(), Limit: 100, @@ -50,7 +48,7 @@ func NewKinesisConfig() KinesisConfig { CommitPeriod: "1s", StartFromOldest: true, Timeout: "5s", - Batching: batchConf, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis_balanced.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis_balanced.go index eb14a25..c7b23eb 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis_balanced.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/kinesis_balanced.go @@ -34,8 +34,6 @@ type KinesisBalancedConfig struct { // NewKinesisBalancedConfig creates a new Config with default values. func NewKinesisBalancedConfig() KinesisBalancedConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 s := sess.NewConfig() return KinesisBalancedConfig{ Config: s, @@ -45,7 +43,7 @@ func NewKinesisBalancedConfig() KinesisBalancedConfig { DynamoDBReadCapacity: 0, DynamoDBWriteCapacity: 0, MaxBatchCount: 1, - Batching: batchConf, + Batching: batch.NewPolicyConfig(), StartFromOldest: true, } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nats_stream.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nats_stream.go index 46b9f91..b2bc838 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nats_stream.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nats_stream.go @@ -38,8 +38,6 @@ type NATSStreamConfig struct { // NewNATSStreamConfig creates a new NATSStreamConfig with default values. func NewNATSStreamConfig() NATSStreamConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return NATSStreamConfig{ URLs: []string{stan.DefaultNatsURL}, ClusterID: "test-cluster", @@ -51,7 +49,7 @@ func NewNATSStreamConfig() NATSStreamConfig { Subject: "benthos_messages", MaxInflight: 1024, AckWait: "30s", - Batching: batchConf, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nsq.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nsq.go index 38f9c52..2447315 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nsq.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/nsq.go @@ -34,8 +34,6 @@ type NSQConfig struct { // NewNSQConfig creates a new NSQConfig with default values. func NewNSQConfig() NSQConfig { - batching := batch.NewPolicyConfig() - batching.Count = 1 return NSQConfig{ Addresses: []string{"localhost:4150"}, LookupAddresses: []string{"localhost:4161"}, @@ -44,7 +42,7 @@ func NewNSQConfig() NSQConfig { UserAgent: "benthos_consumer", TLS: btls.NewConfig(), MaxInFlight: 100, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/redis_streams.go b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/redis_streams.go index 67915f0..3daf81b 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/redis_streams.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/input/reader/redis_streams.go @@ -38,8 +38,6 @@ type RedisStreamsConfig struct { // NewRedisStreamsConfig creates a new RedisStreamsConfig with default values. func NewRedisStreamsConfig() RedisStreamsConfig { - batchConf := batch.NewPolicyConfig() - batchConf.Count = 1 return RedisStreamsConfig{ URL: "tcp://localhost:6379", BodyKey: "body", @@ -47,7 +45,7 @@ func NewRedisStreamsConfig() RedisStreamsConfig { ConsumerGroup: "benthos_group", ClientID: "benthos_consumer", Limit: 10, - Batching: batchConf, + Batching: batch.NewPolicyConfig(), StartFromOldest: true, CommitPeriod: "1s", Timeout: "1s", diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/message/batch/policy.go b/vendor/github.com/Jeffail/benthos/v3/lib/message/batch/policy.go index 263e9c6..c8dd510 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/message/batch/policy.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/message/batch/policy.go @@ -1,6 +1,7 @@ package batch import ( + "errors" "fmt" "time" @@ -95,6 +96,38 @@ func (p PolicyConfig) IsNoop() bool { return true } +func (p PolicyConfig) isLimited() bool { + if p.ByteSize > 0 { + return true + } + if p.Count > 0 { + return true + } + if len(p.Period) > 0 { + return true + } + if !isNoopCondition(p.Condition) { + return true + } + if len(p.Check) > 0 { + return true + } + return false +} + +func (p PolicyConfig) isHardLimited() bool { + if p.ByteSize > 0 { + return true + } + if p.Count > 0 { + return true + } + if len(p.Period) > 0 { + return true + } + return false +} + //------------------------------------------------------------------------------ // Policy implements a batching policy by buffering messages until, based on a @@ -128,6 +161,12 @@ func NewPolicy( log log.Modular, stats metrics.Type, ) (*Policy, error) { + if !conf.isLimited() { + return nil, errors.New("batch policy must have at least one active trigger") + } + if !conf.isHardLimited() { + log.Warnln("Batch policy should have at least one of count, period or byte_size set in order to provide a hard batch ceiling.") + } var cond types.Condition var err error if !isNoopCondition(conf.Condition) { diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/batcher.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/batcher.go index 1ca35a7..05870a9 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/batcher.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/batcher.go @@ -2,6 +2,7 @@ package output import ( "context" + "fmt" "sync/atomic" "time" @@ -36,6 +37,23 @@ type Batcher struct { closedChan chan struct{} } +func newBatcherFromConf( + conf batch.PolicyConfig, + child Type, + mgr types.Manager, + log log.Modular, + stats metrics.Type, +) (Type, error) { + if !conf.IsNoop() { + policy, err := batch.NewPolicy(conf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) + if err != nil { + return nil, fmt.Errorf("failed to construct batch policy: %v", err) + } + child = NewBatcher(policy, child, log, stats) + } + return child, nil +} + // NewBatcher creates a new Producer/Consumer around a buffer. func NewBatcher( batcher *batch.Policy, diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/broker.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/broker.go index d1e9175..b6cfe31 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/broker.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/broker.go @@ -136,14 +136,12 @@ type BrokerConfig struct { // NewBrokerConfig creates a new BrokerConfig with default values. func NewBrokerConfig() BrokerConfig { - batching := batch.NewPolicyConfig() - batching.Count = 1 return BrokerConfig{ Copies: 1, Pattern: "fan_out", MaxInFlight: 1, Outputs: brokerOutputList{}, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } @@ -170,12 +168,8 @@ func NewBroker( if err != nil { return nil, err } - if !conf.Broker.Batching.IsNoop() { - policy, err := batch.NewPolicy(conf.Broker.Batching, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - b = NewBatcher(policy, b, log, stats) + if b, err = newBatcherFromConf(conf.Broker.Batching, b, mgr, log, stats); err != nil { + return nil, err } return b, nil } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/dynamodb.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/dynamodb.go index 4f88a76..0155cca 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/dynamodb.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/dynamodb.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -118,14 +116,10 @@ func NewDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics. TypeDynamoDB, conf.DynamoDB.MaxInFlight, dyn, log, stats, ) } - if bconf := conf.DynamoDB.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.DynamoDB.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/elasticsearch.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/elasticsearch.go index f3043ab..cee9e30 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/elasticsearch.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/elasticsearch.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -82,14 +80,10 @@ func NewElasticsearch(conf Config, mgr types.Manager, log log.Modular, stats met TypeElasticsearch, conf.Elasticsearch.MaxInFlight, elasticWriter, log, stats, ) } - if bconf := conf.Elasticsearch.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.Elasticsearch.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/http_client.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/http_client.go index d743d6e..04582be 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/http_client.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/http_client.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -65,14 +63,10 @@ func NewHTTPClient(conf Config, mgr types.Manager, log log.Modular, stats metric } else { w, err = NewAsyncWriter(TypeHTTPClient, conf.HTTPClient.MaxInFlight, h, log, stats) } - if bconf := conf.HTTPClient.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.HTTPClient.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/kafka.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/kafka.go index de69df7..f072f55 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/kafka.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/kafka.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -77,14 +75,10 @@ func NewKafka(conf Config, mgr types.Manager, log log.Modular, stats metrics.Typ TypeKafka, conf.Kafka.MaxInFlight, k, log, stats, ) } - if bconf := conf.Kafka.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return nil, err } - return w, err + return newBatcherFromConf(conf.Kafka.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis.go index adc0e50..b5dfcd6 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -69,14 +67,10 @@ func NewKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.T TypeKinesis, conf.Kinesis.MaxInFlight, kin, log, stats, ) } - if bconf := conf.Kinesis.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.Kinesis.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis_firehose.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis_firehose.go index 6ea319e..dbc4d66 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis_firehose.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/kinesis_firehose.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -62,14 +60,10 @@ func NewKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats m TypeKinesisFirehose, conf.KinesisFirehose.MaxInFlight, kin, log, stats, ) } - if bconf := conf.KinesisFirehose.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.KinesisFirehose.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/s3.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/s3.go index 3aa87dd..94c91ea 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/s3.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/s3.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -117,15 +115,7 @@ func NewAmazonS3(conf Config, mgr types.Manager, log log.Modular, stats metrics. if err != nil { return nil, err } - - if bconf := conf.S3.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) - } - return w, err + return newBatcherFromConf(conf.S3.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/sqs.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/sqs.go index d75bca9..ef64fba 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/sqs.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/sqs.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -74,14 +72,10 @@ func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics TypeSQS, conf.SQS.MaxInFlight, s, log, stats, ) } - if bconf := conf.SQS.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return w, err } - return w, err + return newBatcherFromConf(conf.SQS.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/table_storage.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/table_storage.go index 7ff0d1e..48315ce 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/table_storage.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/table_storage.go @@ -1,8 +1,6 @@ package output import ( - "fmt" - "github.com/Jeffail/benthos/v3/internal/docs" "github.com/Jeffail/benthos/v3/lib/log" "github.com/Jeffail/benthos/v3/lib/message/batch" @@ -105,15 +103,10 @@ func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats TypeTableStorage, conf.TableStorage.MaxInFlight, sthree, log, stats, ) } - - if bconf := conf.TableStorage.Batching; err == nil && !bconf.IsNoop() { - policy, err := batch.NewPolicy(bconf, mgr, log.NewModule(".batching"), metrics.Namespaced(stats, "batching")) - if err != nil { - return nil, fmt.Errorf("failed to construct batch policy: %v", err) - } - w = NewBatcher(policy, w, log, stats) + if err != nil { + return nil, err } - return w, err + return newBatcherFromConf(conf.TableStorage.Batching, w, mgr, log, stats) } //------------------------------------------------------------------------------ diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/dynamodb.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/dynamodb.go index 5ef5b03..571166a 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/dynamodb.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/dynamodb.go @@ -48,8 +48,6 @@ func NewDynamoDBConfig() DynamoDBConfig { rConf.Backoff.InitialInterval = "1s" rConf.Backoff.MaxInterval = "5s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 return DynamoDBConfig{ sessionConfig: sessionConfig{ Config: session.NewConfig(), @@ -61,7 +59,7 @@ func NewDynamoDBConfig() DynamoDBConfig { TTLKey: "", MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/elasticsearch.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/elasticsearch.go index 52788ba..518d590 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/elasticsearch.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/elasticsearch.go @@ -60,9 +60,6 @@ func NewElasticsearchConfig() ElasticsearchConfig { rConf.Backoff.MaxInterval = "5s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 - return ElasticsearchConfig{ URLs: []string{"http://localhost:9200"}, Sniff: true, @@ -80,7 +77,7 @@ func NewElasticsearchConfig() ElasticsearchConfig { }, MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/http_client.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/http_client.go index a5877f8..beb40a2 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/http_client.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/http_client.go @@ -25,13 +25,11 @@ type HTTPClientConfig struct { // NewHTTPClientConfig creates a new HTTPClientConfig with default values. func NewHTTPClientConfig() HTTPClientConfig { - batching := batch.NewPolicyConfig() - batching.Count = 1 return HTTPClientConfig{ Config: client.NewConfig(), MaxInFlight: 1, // TODO: Increase this default? PropagateResponse: false, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kafka.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kafka.go index 01e4b41..1549c1d 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kafka.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kafka.go @@ -54,8 +54,7 @@ func NewKafkaConfig() KafkaConfig { rConf.Backoff.InitialInterval = "3s" rConf.Backoff.MaxInterval = "10s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 + return KafkaConfig{ Addresses: []string{"localhost:9092"}, ClientID: "benthos_kafka_output", @@ -73,7 +72,7 @@ func NewKafkaConfig() KafkaConfig { SASL: sasl.NewConfig(), MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis.go index 7c21cf6..a513eea 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis.go @@ -54,8 +54,6 @@ func NewKinesisConfig() KinesisConfig { rConf.Backoff.InitialInterval = "1s" rConf.Backoff.MaxInterval = "5s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 return KinesisConfig{ sessionConfig: sessionConfig{ Config: sess.NewConfig(), @@ -65,7 +63,7 @@ func NewKinesisConfig() KinesisConfig { PartitionKey: "", MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis_firehose.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis_firehose.go index bd321f4..62c88ad 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis_firehose.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/kinesis_firehose.go @@ -39,8 +39,7 @@ func NewKinesisFirehoseConfig() KinesisFirehoseConfig { rConf.Backoff.InitialInterval = "1s" rConf.Backoff.MaxInterval = "5s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 + return KinesisFirehoseConfig{ sessionConfig: sessionConfig{ Config: sess.NewConfig(), @@ -48,7 +47,7 @@ func NewKinesisFirehoseConfig() KinesisFirehoseConfig { Stream: "", MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/s3.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/s3.go index 8a29601..8c76298 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/s3.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/s3.go @@ -37,8 +37,6 @@ type AmazonS3Config struct { // NewAmazonS3Config creates a new Config with default values. func NewAmazonS3Config() AmazonS3Config { - batching := batch.NewPolicyConfig() - batching.Count = 1 return AmazonS3Config{ Config: sess.NewConfig(), Bucket: "", @@ -50,7 +48,7 @@ func NewAmazonS3Config() AmazonS3Config { Timeout: "5s", KMSKeyID: "", MaxInFlight: 1, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/sqs.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/sqs.go index 4000713..6a8361a 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/sqs.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/sqs.go @@ -50,8 +50,6 @@ func NewAmazonSQSConfig() AmazonSQSConfig { rConf.Backoff.MaxInterval = "5s" rConf.Backoff.MaxElapsedTime = "30s" - batching := batch.NewPolicyConfig() - batching.Count = 1 return AmazonSQSConfig{ sessionConfig: sessionConfig{ Config: sess.NewConfig(), @@ -61,7 +59,7 @@ func NewAmazonSQSConfig() AmazonSQSConfig { MessageDeduplicationID: "", MaxInFlight: 1, Config: rConf, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/table_storage_config.go b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/table_storage_config.go index 0ae9728..8bbf141 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/table_storage_config.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/output/writer/table_storage_config.go @@ -22,8 +22,6 @@ type AzureTableStorageConfig struct { // NewAzureTableStorageConfig creates a new Config with default values. func NewAzureTableStorageConfig() AzureTableStorageConfig { - batching := batch.NewPolicyConfig() - batching.Count = 1 return AzureTableStorageConfig{ StorageAccount: "", StorageAccessKey: "", @@ -34,7 +32,7 @@ func NewAzureTableStorageConfig() AzureTableStorageConfig { InsertType: "INSERT", Timeout: "5s", MaxInFlight: 1, - Batching: batching, + Batching: batch.NewPolicyConfig(), } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/processor/branch.go b/vendor/github.com/Jeffail/benthos/v3/lib/processor/branch.go index a5fe15c..98028c6 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/processor/branch.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/processor/branch.go @@ -35,6 +35,14 @@ message using another mapping.`, This is useful for preserving the original message contents when using processors that would otherwise replace the entire contents. +### Metadata + +Metadata fields that are added to messages during branch processing will not be +automatically copied into the resulting message. In order to do this you should +explicitly declare in your ` + "`result_map`" + ` either a wholesale copy with +` + "`meta = meta()`" + `, or selective copies with +` + "`meta foo = meta(\"bar\")`" + ` and so on. + ### Error Handling If the ` + "`request_map`" + ` fails the child processors will not be executed. @@ -65,7 +73,7 @@ pipeline: - http: url: https://hub.docker.com/v2/repositories/jeffail/benthos verb: GET - result_map: 'root.repo.status = this' + result_map: root.repo.status = this `, }, { @@ -112,7 +120,7 @@ pipeline: FieldSpecs: docs.FieldSpecs{ docs.FieldCommon( "request_map", - "A [Bloblang mapping](/docs/guides/bloblang/about) that describes how to create a request payload suitable for the child processors of this branch.", + "A [Bloblang mapping](/docs/guides/bloblang/about) that describes how to create a request payload suitable for the child processors of this branch. If left empty then the branch will begin with an exact copy of the origin message (including metadata).", `root = { "id": this.doc.id, "content": this.doc.body.text @@ -129,9 +137,11 @@ pipeline: ), docs.FieldCommon( "result_map", - "A [Bloblang mapping](/docs/guides/bloblang/about) that describes how the resulting messages from branched processing should be mapped back into the original payload.", - `root.foo_result = this`, - `root.bar.body = this.body + "A [Bloblang mapping](/docs/guides/bloblang/about) that describes how the resulting messages from branched processing should be mapped back into the original payload. If left empty the origin message will remain unchanged (including metadata).", + `meta foo_code = meta("code") +root.foo_result = this`, + `meta = meta() +root.bar.body = this.body root.bar.id = this.user.id`, `root.enrichments.foo = if errored() { throw(error()) diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/service/deprecated.go b/vendor/github.com/Jeffail/benthos/v3/lib/service/deprecated.go index 80d1518..87d63f7 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/service/deprecated.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/service/deprecated.go @@ -405,6 +405,6 @@ func deprecatedExecute(configPath string, testSuffix string) { if len(depFlags.streamsDir) > 0 { dirs = append(dirs, depFlags.streamsDir) } - os.Exit(cmdService(configPath, nil, depFlags.strictConfig, depFlags.streamsMode, dirs)) + os.Exit(cmdService(configPath, nil, "", depFlags.strictConfig, depFlags.streamsMode, dirs)) } } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/service/run.go b/vendor/github.com/Jeffail/benthos/v3/lib/service/run.go index 5a7d5dc..2a6f06a 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/service/run.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/service/run.go @@ -159,6 +159,11 @@ func Run() { Value: false, Usage: "display version info, then exit", }, + &cli.StringFlag{ + Name: "log.level", + Value: "", + Usage: "override the configured log level, options are: off, error, warn, info, debug, trace.", + }, &cli.StringFlag{ Name: "config", Aliases: []string{"c"}, @@ -199,7 +204,14 @@ func Run() { cli.ShowAppHelp(c) os.Exit(1) } - os.Exit(cmdService(c.String("config"), c.StringSlice("resources"), !c.Bool("chilled"), false, nil)) + os.Exit(cmdService( + c.String("config"), + c.StringSlice("resources"), + c.String("log.level"), + !c.Bool("chilled"), + false, + nil, + )) return nil }, Commands: []*cli.Command{ @@ -248,7 +260,14 @@ func Run() { For more information check out the docs at: https://benthos.dev/docs/guides/streams_mode/about`[4:], Action: func(c *cli.Context) error { - os.Exit(cmdService(c.String("config"), c.StringSlice("resources"), !c.Bool("chilled"), true, c.Args().Slice())) + os.Exit(cmdService( + c.String("config"), + c.StringSlice("resources"), + c.String("log.level"), + !c.Bool("chilled"), + true, + c.Args().Slice(), + )) return nil }, }, @@ -339,7 +358,7 @@ func Run() { } deprecatedExecute(*configPath, testSuffix) - os.Exit(cmdService(*configPath, nil, false, false, nil)) + os.Exit(cmdService(*configPath, nil, "", false, false, nil)) return nil } diff --git a/vendor/github.com/Jeffail/benthos/v3/lib/service/service.go b/vendor/github.com/Jeffail/benthos/v3/lib/service/service.go index 70059f1..a330061 100644 --- a/vendor/github.com/Jeffail/benthos/v3/lib/service/service.go +++ b/vendor/github.com/Jeffail/benthos/v3/lib/service/service.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "runtime/pprof" + "strings" "syscall" "time" @@ -131,6 +132,7 @@ func readConfig(path string, resourcesPaths []string) (lints []string) { func cmdService( confPath string, resourcesPaths []string, + overrideLogLevel string, strict bool, streamsMode bool, streamsConfigs []string, @@ -144,6 +146,10 @@ func cmdService( return 1 } + if len(overrideLogLevel) > 0 { + conf.Logger.LogLevel = strings.ToUpper(overrideLogLevel) + } + // Logging and stats aggregation. var logger log.Modular diff --git a/vendor/modules.txt b/vendor/modules.txt index 8258cf4..661605d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -43,7 +43,7 @@ github.com/ClickHouse/clickhouse-go/lib/lz4 github.com/ClickHouse/clickhouse-go/lib/protocol github.com/ClickHouse/clickhouse-go/lib/types github.com/ClickHouse/clickhouse-go/lib/writebuffer -# github.com/Jeffail/benthos/v3 v3.28.0 +# github.com/Jeffail/benthos/v3 v3.29.0 ## explicit github.com/Jeffail/benthos/v3/internal/batch github.com/Jeffail/benthos/v3/internal/bloblang @@ -55,6 +55,7 @@ github.com/Jeffail/benthos/v3/internal/docs github.com/Jeffail/benthos/v3/internal/message github.com/Jeffail/benthos/v3/internal/transaction github.com/Jeffail/benthos/v3/lib/api +github.com/Jeffail/benthos/v3/lib/bloblang github.com/Jeffail/benthos/v3/lib/broker github.com/Jeffail/benthos/v3/lib/buffer github.com/Jeffail/benthos/v3/lib/buffer/parallel