Skip to content

all sinks add the trigger to controll sent or ignore event update,and make the code gci-ed #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ receivers:
* A route can have many sub-routes, forming a tree.
* Routing starts from the root route.

## sent or ignore event update
if an event occur many times, It will only update the event fields like ```count``` and ```lastTimestamp```.
By default ```kubernetes-event-exporter``` will ignore the eventUpdate and will not sent to the recivers.
If you Don't want to miss every event,you can use trigger ```sentUpdateEvent``` configurated in each sink to controll whether sent the event to the reciver.
```azure
sentUpdateEvent true|false (default false)
true: sent every matching event to the reciver including when the event updated
false: ignore the event updted
```

for example:
```azure
receivers:
- name: "dump"
stdout:
sentUpdateEvent: true
```

## Using Secrets

In your config file, you can refer to environment variables as `${API_KEY}` therefore you can use ConfigMap or Secrets
Expand Down
1 change: 0 additions & 1 deletion pkg/batch/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"
)


// Writer allows to buffer some items and call the Handler function either when the buffer is full or the timeout is
// reached. There will also be support for concurrency for high volume. The handler function is supposed to return an
// array of booleans to indicate whether the transfer was successful or not. It can be replaced with status codes in
Expand Down
3 changes: 2 additions & 1 deletion pkg/batch/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package batch

import (
"context"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSimpleWriter(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
// and we might need a mechanism to drop the vents
// On closing, the registry sends a signal on all exit channels, and then waits for all to complete.
type ChannelBasedReceiverRegistry struct {
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
ch map[string]chan kube.EnhancedEvent
exitCh map[string]chan interface{}
wg *sync.WaitGroup
MetricsStore *metrics.Store
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/exporter/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package exporter

import (
"testing"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/stretchr/testify/assert"
"testing"
)

func TestEngineNoRoutes(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/exporter/route_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package exporter

import (
"testing"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/stretchr/testify/assert"
"testing"
)

// testReceiverRegistry just records the events to the registry so that tests can validate routing behavior
Expand Down
3 changes: 2 additions & 1 deletion pkg/exporter/rule_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package exporter

import (
"testing"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/stretchr/testify/assert"
"testing"
)

func TestEmptyRule(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/exporter/sync_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"context"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/resmoio/kubernetes-event-exporter/pkg/sinks"
"github.com/rs/zerolog/log"
Expand Down
3 changes: 2 additions & 1 deletion pkg/kube/client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package kube

import (
"os"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os"
)

// GetKubernetesClient returns the client if it's possible in cluster, otherwise tries to read HOME
Expand Down
1 change: 1 addition & 0 deletions pkg/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type EnhancedEvent struct {
corev1.Event `json:",inline"`
IsUpdateEvent bool `json:"isUpdateEvent"`
ClusterName string `json:"clusterName"`
InvolvedObject EnhancedObjectReference `json:"involvedObject"`
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kube/event_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package kube

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)

func TestEnhancedEvent_DeDot(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func NewEventWatcher(config *rest.Config, namespace string, MaxEventAgeSeconds i

func (e *EventWatcher) OnAdd(obj interface{}) {
event := obj.(*corev1.Event)
e.onEvent(event)
e.onEvent(event, false)
}

func (e *EventWatcher) OnUpdate(oldObj, newObj interface{}) {
// Ignore updates
event := newObj.(*corev1.Event)
e.onEvent(event, true)
}

// Ignore events older than the maxEventAgeSeconds
Expand All @@ -87,7 +89,7 @@ func (e *EventWatcher) isEventDiscarded(event *corev1.Event) bool {
return false
}

func (e *EventWatcher) onEvent(event *corev1.Event) {
func (e *EventWatcher) onEvent(event *corev1.Event, IsUpdateEvent bool) {
if e.isEventDiscarded(event) {
return
}
Expand All @@ -102,7 +104,8 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
e.metricsStore.EventsProcessed.Inc()

ev := &EnhancedEvent{
Event: *event.DeepCopy(),
Event: *event.DeepCopy(),
IsUpdateEvent: IsUpdateEvent,
}
ev.Event.ManagedFields = nil

Expand Down
19 changes: 13 additions & 6 deletions pkg/sinks/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package sinks

import (
"bufio"
"cloud.google.com/go/bigquery"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/resmoio/kubernetes-event-exporter/pkg/batch"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
"math/rand"
"os"
"time"
"unicode"

"cloud.google.com/go/bigquery"
"github.com/resmoio/kubernetes-event-exporter/pkg/batch"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
)

// Returns a map filtering out keys that have nil value assigned.
Expand Down Expand Up @@ -138,6 +139,7 @@ func bigQueryImportJsonFromFile(path string, cfg *BigQueryConfig) error {
}

type BigQueryConfig struct {
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
// BigQuery table config
Location string `yaml:"location"`
Project string `yaml:"project"`
Expand Down Expand Up @@ -217,14 +219,19 @@ func NewBigQuerySink(cfg *BigQueryConfig) (*BigQuerySink, error) {
)
batchWriter.Start()

return &BigQuerySink{batchWriter: batchWriter}, nil
return &BigQuerySink{batchWriter: batchWriter, config: cfg}, nil
}

type BigQuerySink struct {
batchWriter *batch.Writer
config *BigQueryConfig
}

func (e *BigQuerySink) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !e.config.SentUpdateEvent {
return nil
}
e.batchWriter.Submit(ev)
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sinks/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

type ElasticsearchConfig struct {
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
// Connection specific
Hosts []string `yaml:"hosts"`
Username string `yaml:"username"`
Expand Down Expand Up @@ -97,6 +98,10 @@ func formatIndexName(pattern string, when time.Time) string {
}

func (e *Elasticsearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !e.cfg.SentUpdateEvent {
return nil
}
var toSend []byte

if e.cfg.DeDot {
Expand Down
18 changes: 12 additions & 6 deletions pkg/sinks/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package sinks
import (
"context"
"encoding/json"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/eventbridge"
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog/log"
"time"
)

type EventBridgeConfig struct {
DetailType string `yaml:"detailType"`
Details map[string]interface{} `yaml:"details"`
Source string `yaml:"source"`
EventBusName string `yaml:"eventBusName"`
Region string `yaml:"region"`
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
DetailType string `yaml:"detailType"`
Details map[string]interface{} `yaml:"details"`
Source string `yaml:"source"`
EventBusName string `yaml:"eventBusName"`
Region string `yaml:"region"`
}

type EventBridgeSink struct {
Expand Down Expand Up @@ -49,6 +51,10 @@ func NewEventBridgeSink(cfg *EventBridgeConfig) (Sink, error) {
}

func (s *EventBridgeSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !s.cfg.SentUpdateEvent {
return nil
}
log.Info().Msg("Sending event to EventBridge ")
var toSend string
if s.cfg.Details != nil {
Expand Down
19 changes: 13 additions & 6 deletions pkg/sinks/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

type FileConfig struct {
Path string `yaml:"path"`
Layout map[string]interface{} `yaml:"layout"`
MaxSize int `yaml:"maxsize"`
MaxAge int `yaml:"maxage"`
MaxBackups int `yaml:"maxbackups"`
DeDot bool `yaml:"deDot"`
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
Path string `yaml:"path"`
Layout map[string]interface{} `yaml:"layout"`
MaxSize int `yaml:"maxsize"`
MaxAge int `yaml:"maxage"`
MaxBackups int `yaml:"maxbackups"`
DeDot bool `yaml:"deDot"`
}

func (f *FileConfig) Validate() error {
Expand All @@ -27,6 +28,7 @@ type File struct {
encoder *json.Encoder
layout map[string]interface{}
DeDot bool
config *FileConfig
}

func NewFileSink(config *FileConfig) (*File, error) {
Expand All @@ -42,6 +44,7 @@ func NewFileSink(config *FileConfig) (*File, error) {
encoder: json.NewEncoder(writer),
layout: config.Layout,
DeDot: config.DeDot,
config: config,
}, nil
}

Expand All @@ -50,6 +53,10 @@ func (f *File) Close() {
}

func (f *File) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !f.config.SentUpdateEvent {
return nil
}
if f.DeDot {
de := ev.DeDot()
ev = &de
Expand Down
5 changes: 5 additions & 0 deletions pkg/sinks/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

type FirehoseConfig struct {
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
DeliveryStreamName string `yaml:"deliveryStreamName"`
Region string `yaml:"region"`
Layout map[string]interface{} `yaml:"layout"`
Expand Down Expand Up @@ -38,6 +39,10 @@ func NewFirehoseSink(cfg *FirehoseConfig) (Sink, error) {
}

func (f *FirehoseSink) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !f.cfg.SentUpdateEvent {
return nil
}
var toSend []byte

if f.cfg.DeDot {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sinks/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package sinks

import (
"context"

"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
)

type InMemoryConfig struct {
Ref *InMemory
SentUpdateEvent bool `yaml:"sentUpdateEvent,omitempty"`
Ref *InMemory
}

type InMemory struct {
Expand All @@ -15,12 +17,14 @@ type InMemory struct {
}

func (i *InMemory) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
// skip update event
if ev.IsUpdateEvent && !i.Config.SentUpdateEvent {
return nil
}
i.Events = append(i.Events, ev)
return nil
}

func (i *InMemory) Close() {
// No-op
}


Loading