Skip to content

Commit 60626cf

Browse files
authored
optimization: added a sandbox queue to retrieve warm sandboxes (#536)
nit added mutex addressing feedback updated comments linter linter comments nit fix tests addressed comments fix update nit nit lint memory leak lint updated exclusivity test addressed copilot comments
1 parent 71cf5a2 commit 60626cf

6 files changed

Lines changed: 758 additions & 204 deletions

File tree

cmd/agent-sandbox-controller/main.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ import (
2929

3030
"github.com/felixge/fgprof"
3131
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32-
ctrl "sigs.k8s.io/controller-runtime"
33-
"sigs.k8s.io/controller-runtime/pkg/healthz"
34-
"sigs.k8s.io/controller-runtime/pkg/log/zap"
35-
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
36-
3732
"sigs.k8s.io/agent-sandbox/controllers"
3833
extensionsv1alpha1 "sigs.k8s.io/agent-sandbox/extensions/api/v1alpha1"
3934
extensionscontrollers "sigs.k8s.io/agent-sandbox/extensions/controllers"
35+
"sigs.k8s.io/agent-sandbox/extensions/controllers/queue"
4036
asmetrics "sigs.k8s.io/agent-sandbox/internal/metrics"
37+
ctrl "sigs.k8s.io/controller-runtime"
38+
"sigs.k8s.io/controller-runtime/pkg/healthz"
39+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
40+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4141
//+kubebuilder:scaffold:imports
4242
)
4343

@@ -227,11 +227,13 @@ func main() {
227227
}
228228

229229
if extensions {
230+
warmSandboxQueue := queue.NewSimpleSandboxQueue()
230231
if err = (&extensionscontrollers.SandboxClaimReconciler{
231-
Client: mgr.GetClient(),
232-
Scheme: mgr.GetScheme(),
233-
Recorder: mgr.GetEventRecorder("sandboxclaim-controller"),
234-
Tracer: instrumenter,
232+
Client: mgr.GetClient(),
233+
Scheme: mgr.GetScheme(),
234+
WarmSandboxQueue: warmSandboxQueue,
235+
Recorder: mgr.GetEventRecorder("sandboxclaim-controller"),
236+
Tracer: instrumenter,
235237
}).SetupWithManager(mgr, sandboxClaimConcurrentWorkers); err != nil {
236238
setupLog.Error(err, "unable to create controller", "controller", "SandboxClaim")
237239
os.Exit(1)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2026 The Kubernetes Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queue
16+
17+
import (
18+
"sync"
19+
20+
"k8s.io/apimachinery/pkg/types"
21+
)
22+
23+
// SandboxKey uniquely identifies a sandbox in the queue.
24+
type SandboxKey types.NamespacedName
25+
26+
// SandboxQueue defines the interface for managing a thread-safe,
27+
// highly concurrent queue of adoptable warm pool sandboxes.
28+
type SandboxQueue interface {
29+
Add(templateHash string, item SandboxKey)
30+
Get(templateHash string) (SandboxKey, bool)
31+
RemoveQueue(templateHash string)
32+
RemoveItem(templateHash string, item SandboxKey)
33+
}
34+
35+
// SimpleSandboxQueue implements SandboxQueue using simple synchronized slices.
36+
type SimpleSandboxQueue struct {
37+
// queues is a thread-safe dictionary from template hash to a synchronizedQueue
38+
queues sync.Map
39+
}
40+
41+
// NewSimpleSandboxQueue initializes a new SimpleSandboxQueue.
42+
func NewSimpleSandboxQueue() *SimpleSandboxQueue {
43+
return &SimpleSandboxQueue{}
44+
}
45+
46+
// Add pushes an item to the specific template's queue.
47+
func (s *SimpleSandboxQueue) Add(templateHash string, item SandboxKey) {
48+
q, _ := s.queues.LoadOrStore(templateHash, newSynchronizedQueue())
49+
q.(*synchronizedQueue).Push(item)
50+
}
51+
52+
// Get pops an item from the specific template's queue.
53+
func (s *SimpleSandboxQueue) Get(templateHash string) (SandboxKey, bool) {
54+
q, ok := s.queues.Load(templateHash)
55+
if !ok {
56+
return SandboxKey{}, false
57+
}
58+
return q.(*synchronizedQueue).Pop()
59+
}
60+
61+
// RemoveItem deletes a specific sandbox from a template's queue.
62+
func (s *SimpleSandboxQueue) RemoveItem(templateHash string, item SandboxKey) {
63+
if q, ok := s.queues.Load(templateHash); ok {
64+
q.(*synchronizedQueue).Remove(item)
65+
}
66+
}
67+
68+
// Remove scans the slice and deletes the item to prevent Ghost Pods.
69+
func (q *synchronizedQueue) Remove(key SandboxKey) {
70+
q.mu.Lock()
71+
defer q.mu.Unlock()
72+
73+
if _, exists := q.set[key]; !exists {
74+
return
75+
}
76+
77+
delete(q.set, key)
78+
79+
for i, k := range q.items {
80+
if k == key {
81+
// Delete from slice
82+
q.items = append(q.items[:i], q.items[i+1:]...)
83+
break
84+
}
85+
}
86+
}
87+
88+
// TODO(vicentefb): Implement queue cleanup mechanism.
89+
// We should remove the queue from the sync.Map when the corresponding
90+
// SandboxWarmPool for a given template is deleted to prevent memory leaks.
91+
type synchronizedQueue struct {
92+
mu sync.Mutex
93+
items []SandboxKey
94+
set map[SandboxKey]struct{} // Used for O(1) deduplication
95+
}
96+
97+
func newSynchronizedQueue() *synchronizedQueue {
98+
return &synchronizedQueue{
99+
items: make([]SandboxKey, 0),
100+
set: make(map[SandboxKey]struct{}),
101+
}
102+
}
103+
104+
// Push adds an item to the queue if it isn't already present.
105+
func (q *synchronizedQueue) Push(key SandboxKey) {
106+
q.mu.Lock()
107+
defer q.mu.Unlock()
108+
if _, exists := q.set[key]; !exists {
109+
q.set[key] = struct{}{}
110+
q.items = append(q.items, key)
111+
}
112+
}
113+
114+
// Pop removes and returns the first item from the queue.
115+
func (q *synchronizedQueue) Pop() (SandboxKey, bool) {
116+
q.mu.Lock()
117+
defer q.mu.Unlock()
118+
119+
if len(q.items) == 0 {
120+
return SandboxKey{}, false
121+
}
122+
123+
// Grab the first item
124+
item := q.items[0]
125+
126+
// This removes the pointer references so the Garbage Collector
127+
// can free the strings in memory!
128+
q.items[0] = SandboxKey{}
129+
130+
// Remove it from slice and set
131+
q.items = q.items[1:]
132+
delete(q.set, item)
133+
134+
return item, true
135+
}
136+
137+
// RemoveQueue completely deletes a template's queue from the sync.Map
138+
// to prevent memory leaks when SandboxTemplates or WarmPools are deleted.
139+
func (s *SimpleSandboxQueue) RemoveQueue(templateHash string) {
140+
s.queues.Delete(templateHash)
141+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2026 The Kubernetes Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package queue
15+
16+
import (
17+
"testing"
18+
)
19+
20+
func TestSimpleSandboxQueue_BasicOperations(t *testing.T) {
21+
q := NewSimpleSandboxQueue()
22+
hash := "template-hash-1"
23+
24+
key1 := SandboxKey{Namespace: "default", Name: "sb-1"}
25+
key2 := SandboxKey{Namespace: "default", Name: "sb-2"}
26+
27+
// Test Add
28+
q.Add(hash, key1)
29+
q.Add(hash, key2)
30+
31+
// Test Get (Should be FIFO)
32+
got1, ok1 := q.Get(hash)
33+
if !ok1 || got1 != key1 {
34+
t.Errorf("Expected %v, got %v (ok: %v)", key1, got1, ok1)
35+
}
36+
37+
got2, ok2 := q.Get(hash)
38+
if !ok2 || got2 != key2 {
39+
t.Errorf("Expected %v, got %v (ok: %v)", key2, got2, ok2)
40+
}
41+
42+
// Queue should now be empty
43+
_, ok3 := q.Get(hash)
44+
if ok3 {
45+
t.Errorf("Expected queue to be empty, but got an item")
46+
}
47+
}
48+
49+
func TestSimpleSandboxQueue_RemoveItem_GhostPodFix(t *testing.T) {
50+
q := NewSimpleSandboxQueue()
51+
hash := "template-hash-1"
52+
53+
key1 := SandboxKey{Namespace: "default", Name: "sb-1"}
54+
key2 := SandboxKey{Namespace: "default", Name: "sb-2"}
55+
key3 := SandboxKey{Namespace: "default", Name: "sb-3"}
56+
57+
q.Add(hash, key1)
58+
q.Add(hash, key2)
59+
q.Add(hash, key3)
60+
61+
// Simulate the Kubelet deleting the middle pod (Ghost Pod scenario)
62+
q.RemoveItem(hash, key2)
63+
64+
// First pop should still be key1
65+
got1, _ := q.Get(hash)
66+
if got1 != key1 {
67+
t.Errorf("Expected %v, got %v", key1, got1)
68+
}
69+
70+
// Second pop should be key3! (key2 was successfully removed)
71+
got3, _ := q.Get(hash)
72+
if got3 != key3 {
73+
t.Errorf("Expected %v to skip deleted item and return %v, but got %v", hash, key3, got3)
74+
}
75+
76+
// Queue should now be empty
77+
_, ok := q.Get(hash)
78+
if ok {
79+
t.Errorf("Expected queue to be empty after Ghost Pod removal")
80+
}
81+
}
82+
83+
func TestSynchronizedQueue_Deduplication(t *testing.T) {
84+
q := newSynchronizedQueue()
85+
key := SandboxKey{Namespace: "default", Name: "duplicate-sb"}
86+
87+
// Push the exact same pod 3 times
88+
q.Push(key)
89+
q.Push(key)
90+
q.Push(key)
91+
92+
// Verify it only stored it once
93+
if len(q.items) != 1 {
94+
t.Errorf("Expected length 1 due to O(1) deduplication, got %d", len(q.items))
95+
}
96+
97+
// Verify the set also only has 1 item
98+
if len(q.set) != 1 {
99+
t.Errorf("Expected set length 1, got %d", len(q.set))
100+
}
101+
}
102+
103+
func TestSimpleSandboxQueue_RemoveQueue_MemoryLeakFix(t *testing.T) {
104+
q := NewSimpleSandboxQueue()
105+
hash := "template-hash-to-delete"
106+
key1 := SandboxKey{Namespace: "default", Name: "sb-1"}
107+
108+
q.Add(hash, key1)
109+
110+
// Simulate SandboxTemplate deletion
111+
q.RemoveQueue(hash)
112+
113+
// Verify the entire queue was wiped from the sync.Map
114+
_, ok := q.Get(hash)
115+
if ok {
116+
t.Errorf("Expected queue to be completely removed, but it still existed")
117+
}
118+
}

0 commit comments

Comments
 (0)