Skip to content

Commit

Permalink
feat: adds kubernetes processor
Browse files Browse the repository at this point in the history
  • Loading branch information
cludden committed Sep 2, 2020
1 parent 7eead3a commit 6b57f5b
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 102 deletions.
110 changes: 9 additions & 101 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# benthos-kubernetes

a kubernetes plugin [benthos](https://github.com/Jeffail/benthos) which includes the following components:
a collections of [benthos](https://github.com/Jeffail/benthos) plugins for integrating with Kubernetes.

**Inputs:**

- `kubernetes` streams kubernetes objects for one or more configured watches
- [kubernetes](./doc/kubernetes_input.md) streams kubernetes objects for one or more configured watches

**Outputs:**

- `kubernetes` creates, updates, and deleted kubernetes objects
- `kubernetes_status` writes object status to kubernetes
- [kubernetes](./doc/kubernetes_output.md) creates, updates, and deleted kubernetes objects
- [kubernetes_status](./doc/kubernetes_status_output.md) writes object status to kubernetes

**Processors:**

- [kubernetes](./doc/kubernetes_processor.md) performs operations against a kubernetes cluster

## Installing

Expand All @@ -22,103 +26,7 @@ a kubernetes plugin [benthos](https://github.com/Jeffail/benthos) which includes

## Getting Started

Sample benthos stream config:

```yaml
input:
type: kubernetes
plugin:
watches:
# watch pods in all namespaces
- group: ""
version: v1
kind: Pod
# watch custom resources in specified namespaces
- group: example.com
version: v1alpha1
kind: Foo
namespaces: [default, kube-system]
# watch custom resources that match a label selector
- group: example.com
version: v1apha1
kind: Bar
selector:
matchLabels:
color: blue
matchExpressions:
- key: color
operator: NotIn
values: [green, yellow]
# watch replica sets and reconcile when their pods are modified
- version: v1
kind: ReplicaSet
owns:
- version: v1
kind: Pod

pipeline:
processors:
- bloblang: |
root = match {
meta().exists("deleted") => deleted()
}
output:
switch:
outputs:
- condition:
bloblang: metadata.finalizers.or([]).contains("finalizer.foos.example.com") != true
output:
type: kubernetes
plugin: {}
processors:
- bloblang: |
meta requeue_after = "1ms"
map finalizer {
root = this
metadata.finalizers = metadata.finalizers.append("finalizer.foos.example.com")
}
root = match {
metadata.finalizers.or([]).contains("finalizer.foos.example.com") => deleted()
_ => this.apply("finalizer")
}
- sync_response: {}
- log:
message: adding finalizer...

- output:
type: kubernetes_status
plugin: {}
processors:
- bloblang: |
root = this
status.observedGeneration = metadata.generation
status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.000000000Z")
status.status = if metadata.exists("deletionTimestamp") {
"Destroying"
} else {
"Reconciling"
}
- log:
message: updating status...
```
Or see [examples](./example)
### Metadata
This input adds the following metadata fields to each message:
```
- deleted (present only if object has been deleted)
- group
- kind
- name
- namespace
- version
```
Additionally, this input will check for a `requeue_after` metadata entry on the [synchronous response](https://www.benthos.dev/docs/guides/sync_responses), and if found, will requeue the object for reconciliation.
See [examples](./example/status.yml)

## License

Expand Down
1 change: 1 addition & 0 deletions cmd/benthos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

_ "github.com/cludden/benthos-kubernetes/input"
_ "github.com/cludden/benthos-kubernetes/output"
_ "github.com/cludden/benthos-kubernetes/processor"
)

//------------------------------------------------------------------------------
Expand Down
108 changes: 108 additions & 0 deletions doc/kubernetes_input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# kubernetes

streams kubernetes objects for one or more configured watches

**Examples**
```yaml
input:
type: kubernetes
plugin:
watches:
# watch pods in all namespaces
- group: ""
version: v1
kind: Pod
# watch custom resources in specified namespaces
- group: example.com
version: v1alpha1
kind: Foo
namespaces: [default, kube-system]
# watch custom resources that match a label selector
- group: example.com
version: v1apha1
kind: Bar
selector:
matchLabels:
color: blue
matchExpressions:
- key: color
operator: NotIn
values: [green, yellow]
# watch replica sets and reconcile when their pods are modified
- version: v1
kind: ReplicaSet
owns:
- version: v1
kind: Pod
```
## Fields
`group`

resource group

Type: `string`
Default: `""`

`kind`

resource kind

Type: `string`
Default: `""`

`version`

resource version

Type: `string`
Default: `""`

`namespaces`

optional namespace filter

Type: `[]string`

`owns`

optional list of dependencies to watch

Type: `[]object({group: string, version: string, kind: string })`

`selector`

optional label selector

Type: `object`

`selector.matchLabels`

optional label selector match requirements

Type: `object`

`selector.matchExpressions`

optional label selector match expressions

Type: `object({key: string, operator: string, values: []string})`


## Metadata

This input adds the following metadata fields to each message:

```
- deleted (present only if object has been deleted)
- group
- kind
- name
- namespace
- version
```

## Synchronous Responses

Additionally, this input will check for a `requeue_after` metadata entry on [synchronous response](https://www.benthos.dev/docs/guides/sync_responses) messages, and if found, will requeue the object for reconciliation.
41 changes: 41 additions & 0 deletions doc/kubernetes_output.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# kubernetes

creates, updates, and deleted kubernetes objects based

This output will perform the following actions for all message parts:
- fail if the payload is not a valid kubernetes object
- delete the object if a `deleted` metadata key is present
- update the object if a `uid` is present
- create the object if no `uid` is present

**Examples**
```yaml
input:
type: kubernetes
plugin:
watches:
- group: example.com
version: v1alpha1
kind: Foo

pipeline:
processors:
- bloblang: |
root = match {
meta().exists("deleted") => deleted()
}
output:
type: kubernetes
plugin: {}
processors:
- bloblang: |
map finalizer {
root = this
metadata.finalizers = metadata.finalizers.append("finalizer.foos.example.com")
}
root = match {
metadata.finalizers.or([]).contains("finalizer.foos.example.com") => deleted()
_ => this.apply("finalizer")
}
```
12 changes: 12 additions & 0 deletions doc/kubernetes_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# kubernetes

performs operations against a kubernetes cluster

## Fields

`operator`

specifies the kubernetes client operation to perform

Type: `string`
Options: `create`, `datele`, `get`, `update`
35 changes: 35 additions & 0 deletions doc/kubernetes_status_output.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# kubernetes_status

updates a kubernetes object's status subresource

**Examples**
```yaml
input:
type: kubernetes
plugin:
watches:
- group: example.com
version: v1alpha1
kind: Foo

pipeline:
processors:
- bloblang: |
root = match {
meta().exists("deleted") => deleted()
}
output:
type: kubernetes_status
plugin: {}
processors:
- bloblang: |
root = this
status.observedGeneration = metadata.generation
status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.000000000Z")
status.status = if metadata.exists("deletionTimestamp") {
"Destroying"
} else {
"Reconciling"
}
```
46 changes: 45 additions & 1 deletion example/status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ output:
- log:
message: adding finalizer...

- output:
- fallthrough: true
output:
type: kubernetes_status
plugin: {}
processors:
Expand All @@ -56,5 +57,48 @@ output:
- log:
message: updating status...

- output:
type: kubernetes
plugin: {}
processors:
- branch:
request_map: |
root = {
"apiVersion": "example.com/v1",
"kind": "Bar",
"metadata": {
"name": metadata.name,
"namespace": metadata.namespace
}
}
processors:
- type: kubernetes
plugin:
operator: get
result_map: |
root.bar = if errored() {
throw(error())
} else {
this
}
- log:
message: reconciling bar

- bloblang: |
let ownerRef = {
"apiVersion": apiVersion,
"controller": true,
"blockOwnerDeletion": true,
"kind": kind,
"name": metadata.name,
"uid": metadata.uid
}
root = bar
root.metadata.labels = metadata.labels
root.metadata.ownerReferences = [$ownerRef]
root.spec.size = spec.size
logger:
level: info
Loading

0 comments on commit 6b57f5b

Please sign in to comment.