Skip to content

Commit

Permalink
feat: adds result config to kubernetes input, improves docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cludden committed Oct 4, 2020
1 parent 316208a commit 27a30dd
Show file tree
Hide file tree
Showing 58 changed files with 911 additions and 773 deletions.
83 changes: 78 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# benthos-kubernetes

a collections of [benthos](https://github.com/Jeffail/benthos) plugins for integrating with Kubernetes.
a collection of [benthos](https://github.com/Jeffail/benthos) plugins for integrating with Kubernetes.

**Inputs:**
#### Inputs

- [kubernetes](./doc/kubernetes_input.md) streams kubernetes objects for one or more configured watches

**Outputs:**
#### Outputs

- [kubernetes](./doc/kubernetes_output.md) creates, updates, and deleted kubernetes objects
- [kubernetes_status](./doc/kubernetes_status_output.md) writes object status to kubernetes

**Processors:**
#### Processors

- [kubernetes](./doc/kubernetes_processor.md) performs operations against a kubernetes cluster

Expand All @@ -27,9 +27,82 @@ a collections of [benthos](https://github.com/Jeffail/benthos) plugins for integ
- download a [release](https://github.com/cludden/benthos-kubernetes/releases)
- as a benthos [plugin](./cmd/benthos/main.go)

```go
package main

import (
"github.com/Jeffail/benthos/v3/lib/service"
_ "github.com/cludden/benthos-kubernetes/input"
_ "github.com/cludden/benthos-kubernetes/output"
_ "github.com/cludden/benthos-kubernetes/processor"
)

func main() {
service.Run()
}
```

## Getting Started

See [examples](./example/status.yml)
```yaml
input:
type: kubernetes
plugin:
watches:
- group: example.com
version: v1
kind: Foo
owns:
- group: example.com
version: v1
kind: Bar
result:
requeue: meta().exists("requeue")
requeue_after: ${!meta("requeue_after").catch("")}

pipeline:
processors:
- switch:
# ignore deleted items
- check: meta().exists("deleted")
processors:
- bloblang: root = deleted()

# reconcile dependent resources
- processors:
- branch:
processors:
- bloblang: |
apiVersion = "example.com/v1"
kind = "Bar"
metadata.labels = metadata.labels
metadata.name = metadata.name
metadata.namespace = metadata.namespace
metadata.ownerReferences = [{
"apiVersion": apiVersion,
"kind": kind,
"controller": true,
"blockOwnerDeletion": true,
"name": metadata.name,
"uid": metadata.uid
}]
spec = spec
- type: kubernetes
plugin:
operator: get
- type: kubernetes
plugin:
operator: ${! if errored() { "create" } else { "update" } }
result_map: |
root.status.bar = metadata.uid
root.status.status = "Ready"
output:
type: kubernetes_status
plugin: {}
```
Additional examples provided [here](./example)
## License
Expand Down
128 changes: 104 additions & 24 deletions doc/kubernetes_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
streams kubernetes objects for one or more configured watches

**Examples**

```yaml
input:
type: kubernetes
Expand Down Expand Up @@ -34,61 +35,144 @@ input:
owns:
- version: v1
kind: Pod
result:
requeue: meta().exists("requeue")
requeue_after: ${!meta("requeue_after).or("")}
```
## Fields
`group`
### `result`

Customize the result of a reconciliation request via [synchronous responses](https://www.benthos.dev/docs/guides/sync_responses).

Type: `object`

resource group
### `result.requeue`

A [Bloblang query](https://www.benthos.dev/docs/guides/bloblang/about/) that should return a boolean value indicating whether the input resource should be requeued. An empty string disables this functionality.

Type: `string`
Default: `""`

`kind`
### `result.requeue_after`

resource kind
Specify a duration after which the input resource should be requeued. This is a string value, which allows you to customize it based on resulting payloads and their metadata using [interpolation functions](https://www.benthos.dev/docs/configuration/interpolation#bloblang-queries). An empty string disables this functionality.

Type: `string`
Default: `""`

`version`
### `watches[]`

A list of watch configurations that specify the set of kubernetes objects to target.

Type: `list(object)`
Default: `[]`
Required: `true`

resource version
### `watches[].group`

Resource group selector

Type: `string`
Default: `""`

`namespaces`
### `watches[].kind`

Resource kind selector

Type: `string`
Default: `""`
Required: `true`

optional namespace filter
### `watches[].namespaces`

Type: `[]string`
Resource namespace selector. An empty array here indicates cluster scope.

`owns`
Type: `list(string)`
Default: `[]`

optional list of dependencies to watch
### `watches[].owns[]`

Type: `[]object({group: string, version: string, kind: string })`
Specifies an optional list of dependencies to watch. This requires the correct owner references to be present on the dependent objects.

`selector`
Type: `list(object)`
Default: `[]`

optional label selector
### `watches[].owns[].group`

Type: `object`
Dependency group selector

`selector.matchLabels`
Type: `string`
Default: `""`

### `watches[].owns[].kind`

Dependency kind selector

Type: `string`
Default: `""`
Required: `true`

### `watches[].owns[].version`

Dependency version selector

Type: `string`
Default: `""`
Required: `true`

optional label selector match requirements
### `watches[].selector`

Optional label selector to apply as target filter.

Type: `object`
Default: `{}`

### `watches[].selector.matchExpressions[]`

List of label match expressions to apply as target filter.

Type: `list(object)`
Default: `{}`

### `watches[].selector.matchExpressions[].key`

Subject of the given expression.

Type: `string`
Default: `""`
Required: `true`

### `watches[].selector.matchExpressions[].operator`

Operator of the given expression (e.g. `Exists`, `In`, `NotIn`)

Type: `string`
Default: `""`
Required: `true`

### `watches[].selector.matchExpressions[].values[]`

List of values applied to operator in order to evaluate the expression.

Type: `string`
Default: `[]`

`selector.matchExpressions`
### `watches[].selector.matchLabels`

optional label selector match expressions
Map of key value label pairs to use as target filter.

Type: `object({key: string, operator: string, values: []string})`
Type: `map(string)`
Default: `{}`

### `watches[].version`

Resource version selector

Type: `string`
Default: `""`
Required: `true`

## Metadata

Expand All @@ -102,7 +186,3 @@ This input adds the following metadata fields to each message:
- namespace
- version
```

## Synchronous Responses

Additionally, this input will check for a `requeue_after` metadata entry on [synchronous response](https://www.benthos.dev/docs/guides/sync_responses) messages, and if found, will requeue the object for reconciliation.
37 changes: 26 additions & 11 deletions doc/kubernetes_output.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,54 @@
creates, updates, and deleted kubernetes objects based

This output will perform the following actions for all message parts:

- fail if the payload is not a valid kubernetes object
- delete the object if a `deleted` metadata key is present
- update the object if a `uid` is present
- create the object if no `uid` is present

**Examples**

```yaml
input:
type: kubernetes
plugin:
watches:
- group: example.com
version: v1alpha1
version: v1
kind: Foo

pipeline:
processors:
- bloblang: |
root = match {
meta().exists("deleted") => deleted()
}
output:
type: kubernetes
plugin: {}
processors:
- bloblang: |
map finalizer {
root = this
metadata.finalizers = metadata.finalizers.append("finalizer.foos.example.com")
}
root = match {
meta().exists("deleted") => deleted()
metadata.finalizers.or([]).contains("finalizer.foos.example.com") => deleted()
_ => this.apply("finalizer")
}
```
output:
type: kubernetes
plugin: {}
```
## Fields
### `deletion_propagation`

Specifies the [deletion propagation policy](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents) when performing `delete` operations.

Type: `string`
Default: `Background`
Options: `Background`, `Foreground`, `Orphan`

### `max_in_flight`

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: `number`
Default: `1`
29 changes: 26 additions & 3 deletions doc/kubernetes_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,32 @@ performs operations against a kubernetes cluster

## Fields

`operator`
### `deletion_propagation`

specifies the kubernetes client operation to perform
Specifies the [deletion propagation policy](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents) used with the `delete` operator.

Type: `string`
Options: `create`, `delete`, `get`, `update`
Default: `Background`
Options: `Background`, `Foreground`, `Orphan`

### `operator`

Specifies the kubernetes client operation to perform.

Type: `string`
Options: `create`, `delete`, `get`, `status`, `update`

### `operator_mapping`

A [Bloblang mapping](https://www.benthos.dev/docs/guides/bloblang/about/) that resolves to valid operator.

Type: `string`

### `parts[]`

An optional array of message indexes of a batch that the processor should apply to. If left empty all messages are processed. This field is only applicable when batching messages at the input level.

Indexes can be negative, and if so the part will be selected from the end counting backwards starting from -1.

Type: `list(number)`
Default: `[]`
Loading

0 comments on commit 27a30dd

Please sign in to comment.