Skip to content

Commit 3b04a83

Browse files
committed
Implement BadgerGC with controller runtime
Signed-off-by: leigh capili <[email protected]>
1 parent 09169ce commit 3b04a83

File tree

4 files changed

+188
-2
lines changed

4 files changed

+188
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/fluxcd/pkg/oci v0.47.0
1717
github.com/fluxcd/pkg/runtime v0.59.0
1818
github.com/fluxcd/pkg/version v0.7.0
19+
github.com/go-logr/logr v1.4.2
1920
github.com/google/go-containerregistry v0.20.3
2021
github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20250225234217-098045d5e61f
2122
github.com/onsi/ginkgo v1.16.5
@@ -84,7 +85,6 @@ require (
8485
github.com/fsnotify/fsnotify v1.9.0 // indirect
8586
github.com/fxamacker/cbor/v2 v2.8.0 // indirect
8687
github.com/go-errors/errors v1.5.1 // indirect
87-
github.com/go-logr/logr v1.4.2 // indirect
8888
github.com/go-logr/zapr v1.3.0 // indirect
8989
github.com/go-openapi/jsonpointer v0.21.1 // indirect
9090
github.com/go-openapi/jsonreference v0.21.0 // indirect

internal/database/badger_gc.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package database
17+
18+
import (
19+
"context"
20+
"errors"
21+
"time"
22+
23+
"github.com/dgraph-io/badger/v3"
24+
"github.com/go-logr/logr"
25+
ctrl "sigs.k8s.io/controller-runtime"
26+
)
27+
28+
// BadgerGarbageCollector implements controller runtime's Runnable
29+
type BadgerGarbageCollector struct {
30+
// DiscardRatio must be a float between 0.0 and 1.0, inclusive
31+
// See badger.DB.RunValueLogGC for more info
32+
DiscardRatio float64
33+
Interval time.Duration
34+
35+
name string
36+
db *badger.DB
37+
log logr.Logger
38+
}
39+
40+
// NewBadgerGarbageCollector creates and returns a new BadgerGarbageCollector
41+
func NewBadgerGarbageCollector(name string, db *badger.DB, interval time.Duration, discardRatio float64) *BadgerGarbageCollector {
42+
return &BadgerGarbageCollector{
43+
DiscardRatio: discardRatio,
44+
Interval: interval,
45+
46+
name: name,
47+
db: db,
48+
}
49+
}
50+
51+
// Start repeatedly runs the BadgerDB garbage collector with a delay inbetween
52+
// runs.
53+
//
54+
// Start blocks until the context is cancelled. The database is expected to
55+
// already be open and not be closed while this context is active.
56+
//
57+
// ctx should be a logr.Logger context.
58+
func (gc *BadgerGarbageCollector) Start(ctx context.Context) error {
59+
gc.log = ctrl.LoggerFrom(ctx).WithName(gc.name)
60+
61+
gc.log.Info("Starting Badger GC")
62+
timer := time.NewTimer(gc.Interval)
63+
for {
64+
select {
65+
case <-timer.C:
66+
gc.discardValueLogFiles()
67+
timer.Reset(gc.Interval)
68+
case <-ctx.Done():
69+
timer.Stop()
70+
gc.log.Info("Stopped Badger GC")
71+
return nil
72+
}
73+
}
74+
}
75+
76+
// upper bound for loop
77+
const maxDiscards = 1000
78+
79+
func (gc *BadgerGarbageCollector) discardValueLogFiles() {
80+
for c := 0; c < maxDiscards; c++ {
81+
err := gc.db.RunValueLogGC(gc.DiscardRatio)
82+
if errors.Is(err, badger.ErrNoRewrite) {
83+
// there is no more garbage to discard
84+
gc.log.V(1).Info("Ran Badger GC", "discarded_vlogs", c)
85+
return
86+
}
87+
if err != nil {
88+
gc.log.Error(err, "Badger GC Error", "discarded_vlogs", c)
89+
return
90+
}
91+
}
92+
gc.log.Error(nil, "Warning: Badger GC ran for maximum discards", "discarded_vlogs", maxDiscards)
93+
}

internal/database/badger_gc_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright 2020 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package database
17+
18+
import (
19+
"context"
20+
"os"
21+
"testing"
22+
"time"
23+
24+
"github.com/dgraph-io/badger/v3"
25+
"github.com/go-logr/logr"
26+
"github.com/go-logr/logr/testr"
27+
)
28+
29+
func TestBadgerGarbageCollectorDoesStop(t *testing.T) {
30+
badger, db := createBadgerDatabaseForGC(t)
31+
ctx, cancel := context.WithCancel(
32+
logr.NewContext(context.Background(), testr.New(t)))
33+
34+
stop := make(chan struct{})
35+
go func() {
36+
gc := NewBadgerGarbageCollector("test-badger-gc", badger, 500*time.Millisecond, 0.01)
37+
gc.Start(ctx)
38+
stop <- struct{}{}
39+
}()
40+
41+
time.Sleep(time.Second)
42+
43+
tags := []string{"latest", "v0.0.1", "v0.0.2"}
44+
fatalIfError(t, db.SetTags(testRepo, tags))
45+
_, err := db.Tags(testRepo)
46+
fatalIfError(t, err)
47+
t.Log("wrote tags successfully")
48+
49+
time.Sleep(time.Second)
50+
51+
cancel()
52+
t.Log("waiting for GC stop")
53+
select {
54+
case <-time.NewTimer(5 * time.Second).C:
55+
t.Fatalf("GC did not stop")
56+
case <-stop:
57+
t.Log("GC Stopped")
58+
}
59+
}
60+
61+
func createBadgerDatabaseForGC(t *testing.T) (*badger.DB, *BadgerDatabase) {
62+
t.Helper()
63+
dir, err := os.MkdirTemp(os.TempDir(), "badger")
64+
if err != nil {
65+
t.Fatal(err)
66+
}
67+
db, err := badger.Open(badger.DefaultOptions(dir))
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
t.Cleanup(func() {
72+
db.Close()
73+
os.RemoveAll(dir)
74+
})
75+
return db, NewBadgerDatabase(db)
76+
}

main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"os"
23+
"time"
2324

2425
"github.com/dgraph-io/badger/v3"
2526
flag "github.com/spf13/pflag"
@@ -55,7 +56,10 @@ import (
5556
"github.com/fluxcd/image-reflector-controller/internal/features"
5657
)
5758

58-
const controllerName = "image-reflector-controller"
59+
const (
60+
controllerName = "image-reflector-controller"
61+
discardRatio = 0.7
62+
)
5963

6064
var (
6165
scheme = runtime.NewScheme()
@@ -80,6 +84,7 @@ func main() {
8084
watchOptions helper.WatchOptions
8185
storagePath string
8286
storageValueLogFileSize int64
87+
gcInterval uint16 // max value is 65535 minutes (~ 45 days) which is well under the maximum time.Duration
8388
concurrent int
8489
awsAutoLogin bool
8590
gcpAutoLogin bool
@@ -94,6 +99,7 @@ func main() {
9499
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
95100
flag.StringVar(&storagePath, "storage-path", "/data", "Where to store the persistent database of image metadata")
96101
flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.")
102+
flag.Uint16Var(&gcInterval, "gc-interval", 1, "The number of minutes to wait between garbage collections. 0 disables the garbage collector.")
97103
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.")
98104

99105
// NOTE: Deprecated flags.
@@ -132,7 +138,14 @@ func main() {
132138
os.Exit(1)
133139
}
134140
defer badgerDB.Close()
141+
135142
db := database.NewBadgerDatabase(badgerDB)
143+
var badgerGC *database.BadgerGarbageCollector
144+
if gcInterval > 0 {
145+
badgerGC = database.NewBadgerGarbageCollector("badger-gc", badgerDB, time.Duration(gcInterval)*time.Minute, discardRatio)
146+
} else {
147+
setupLog.V(1).Info("Badger garbage collector is disabled")
148+
}
136149

137150
watchNamespace := ""
138151
if !watchOptions.AllNamespaces {
@@ -205,6 +218,10 @@ func main() {
205218
os.Exit(1)
206219
}
207220

221+
if badgerGC != nil {
222+
mgr.Add(badgerGC)
223+
}
224+
208225
probes.SetupChecks(mgr, setupLog)
209226

210227
var eventRecorder *events.Recorder

0 commit comments

Comments
 (0)