Skip to content

Commit 573ab46

Browse files
committed
feat: adds kubernetes and kubernetes_status outputs
1 parent 159a03d commit 573ab46

File tree

9 files changed

+497
-81
lines changed

9 files changed

+497
-81
lines changed

README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@ a kubernetes plugin [benthos](https://github.com/Jeffail/benthos) which includes
1212
- `kubernetes_status` writes object status to kubernetes
1313

1414
## Installing
15+
1516
- with Docker
1617
```shell
17-
$ docker run cludden/benthos-kubernetes-input -h
18+
$ docker run cludden/benthos-kubernetes -h
1819
```
19-
- download a [release](https://github.com/cludden/benthos-kubernetes-input/releases)
20-
- as benthos [plugin](./cmd/benthos/main.go)
20+
- download a [release](https://github.com/cludden/benthos-kubernetes/releases)
21+
- as a benthos [plugin](./cmd/benthos/main.go)
2122

2223
## Getting Started
24+
2325
Sample benthos stream config:
26+
2427
```yaml
2528
input:
2629
type: kubernetes
@@ -50,10 +53,13 @@ input:
5053
output:
5154
stdout: {}
5255
```
56+
5357
Or see [examples](./example)
5458
5559
## Metadata
60+
5661
This input adds the following metadata fields to each message:
62+
5763
```
5864
- deleted (present only if object has been deleted)
5965
- group
@@ -64,5 +70,6 @@ This input adds the following metadata fields to each message:
6470
```
6571
6672
## License
73+
6774
Licensed under the [MIT License](LICENSE.md)
6875
Copyright (c) 2020 Chris Ludden

cmd/benthos/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package main
33
import (
44
"github.com/Jeffail/benthos/v3/lib/service"
55

6-
_ "github.com/cludden/benthos-kubernetes-input/input"
6+
_ "github.com/cludden/benthos-kubernetes/input"
7+
_ "github.com/cludden/benthos-kubernetes/output"
78
)
89

910
//------------------------------------------------------------------------------

example/benthos.yml

Lines changed: 0 additions & 49 deletions
This file was deleted.

example/crds.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ spec:
2424
properties:
2525
size:
2626
type: number
27+
status:
28+
type: object
29+
x-kubernetes-preserve-unknown-fields: true
2730
subresources:
2831
status: {}
2932
additionalPrinterColumns:

example/finalizer.yml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
input:
2+
type: kubernetes
3+
plugin:
4+
watches:
5+
- group: example.com
6+
version: v1
7+
kind: Bar
8+
9+
pipeline:
10+
processors:
11+
- conditional:
12+
condition:
13+
bloblang: meta().exists("deleted")
14+
processors:
15+
- log:
16+
message: deletion detected, substituting last cached object image...
17+
- cache:
18+
cache: objects
19+
operator: get
20+
key: ${!meta("kind")}/${INVISION_CONTEXT}/${!meta("namespace")}/${!meta("name")}
21+
else_processors:
22+
- cache:
23+
cache: objects
24+
operator: set
25+
key: ${!meta("kind")}/${INVISION_CONTEXT}/${!meta("namespace")}/${!meta("name")}
26+
value: ${!content()}
27+
28+
- bloblang: |
29+
map finalizer {
30+
root = this
31+
metadata.finalizers = metadata.finalizers.or([]).append("finalizer.bars.example.com")
32+
}
33+
root = match {
34+
meta().exists("deleted") || metadata.finalizers.or([]).contains("finalizer.bars.example.com") => deleted()
35+
_ => this.apply("finalizer")
36+
}
37+
38+
output:
39+
broker:
40+
outputs:
41+
- type: kubernetes
42+
plugin: {}
43+
- stdout: {}
44+
45+
logger:
46+
level: info
47+
48+
resources:
49+
caches:
50+
objects:
51+
memory: {}

example/manifest.yml

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,16 @@ metadata:
77
color: green
88
finalizers:
99
- finalizer.foos.example.com
10+
- random
1011
spec:
1112
size: 1
1213

13-
---
14-
apiVersion: example.com/v1
15-
kind: Foo
16-
metadata:
17-
name: two
18-
namespace: kube-system
19-
labels:
20-
color: yellow
21-
finalizers:
22-
- finalizer.foos.example.com
23-
spec:
24-
size: 2
25-
2614
---
2715
apiVersion: example.com/v1
2816
kind: Bar
2917
metadata:
3018
name: three
3119
labels:
3220
color: blue
33-
finalizers:
34-
- finalizer.bars.example.com
3521
spec:
3622
size: 3
37-
38-
---
39-
apiVersion: example.com/v1
40-
kind: Bar
41-
metadata:
42-
name: four
43-
namespace: kube-system
44-
labels:
45-
color: purple
46-
finalizers:
47-
- finalizer.bars.example.com
48-
spec:
49-
size: 4

example/status.yml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
input:
2+
type: kubernetes
3+
plugin:
4+
watches:
5+
- group: example.com
6+
version: v1
7+
kind: Foo
8+
# - group: example.com
9+
# version: v1
10+
# kind: Bar
11+
12+
pipeline:
13+
processors:
14+
- conditional:
15+
condition:
16+
bloblang: meta().exists("deleted")
17+
processors:
18+
- log:
19+
message: deletion detected, substituting last cached object image...
20+
- cache:
21+
cache: objects
22+
operator: get
23+
key: ${!meta("kind")}/${INVISION_CONTEXT}/${!meta("namespace")}/${!meta("name")}
24+
else_processors:
25+
- cache:
26+
cache: objects
27+
operator: set
28+
key: ${!meta("kind")}/${INVISION_CONTEXT}/${!meta("namespace")}/${!meta("name")}
29+
value: ${!content()}
30+
31+
- bloblang: |
32+
let metadataKeys = ["deletionTimestamp","finalizers"]
33+
let state = {
34+
"metadata": metadata.filter($metadataKeys.contains(this.key)),
35+
"spec": spec
36+
}
37+
let hash = $state.string().hash("sha1").encode("hex")
38+
let finalizer = "finalizer.%ss.example.com".format(kind.lowercase())
39+
let finalizers = metadata.finalizers.or([])
40+
41+
map reconciling {
42+
root = obj
43+
status.hash = hash
44+
status.lastReconciledAt = timestamp_utc("2006-01-02T15:04:05.999999999Z")
45+
status.status = "Reconciling"
46+
}
47+
48+
meta hash = $hash
49+
root = match {
50+
meta().exists("deleted") => this
51+
metadata.exists("deletionTimestamp") && $finalizers.join(",") != $finalizer => deleted()
52+
status.hash.or("") == $hash => deleted()
53+
_ => {"obj":this,"hash":$hash}.apply("reconciling")
54+
}
55+
56+
output:
57+
broker:
58+
outputs:
59+
- type: kubernetes_status
60+
plugin: {}
61+
processors:
62+
- bloblang: |
63+
root = match {
64+
meta().exists("deleted") => deleted()
65+
}
66+
- log:
67+
message: updating status
68+
- stdout: {}
69+
70+
logger:
71+
level: info
72+
73+
resources:
74+
caches:
75+
objects:
76+
memory: {}

0 commit comments

Comments
 (0)