Skip to content

Commit dc8a08e

Browse files
committed
feat: warmpool selection strategy
updated based on copilot added test to sandbox controller lint fixed fixed e2e tests addressed Janet's comments autogent fixed docs addressed Barni's question updated docs revert capitalization:
1 parent 9584360 commit dc8a08e

11 files changed

Lines changed: 380 additions & 11 deletions

File tree

api/v1beta1/sandbox_types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ type SandboxStatus struct {
230230
// A pod may have multiple IPs in dual-stack clusters.
231231
// +optional
232232
PodIPs []string `json:"podIPs,omitempty"`
233+
234+
// nodeName is the name of the node where the underlying pod is scheduled.
235+
// +optional
236+
NodeName string `json:"nodeName,omitempty"`
233237
}
234238

235239
// +genclient

controllers/sandbox_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,11 @@ func (r *SandboxReconciler) reconcileChildResources(ctx context.Context, sandbox
237237
allErrors = errors.Join(allErrors, err)
238238
if pod == nil {
239239
sandbox.Status.PodIPs = nil
240+
sandbox.Status.NodeName = ""
240241
} else {
241242
sandbox.Status.LabelSelector = fmt.Sprintf("%s=%s", sandboxLabel, NameHash(sandbox.Name))
242243
sandbox.Status.PodIPs = podIPsFromStatus(pod.Status.PodIPs)
244+
sandbox.Status.NodeName = pod.Spec.NodeName
243245
}
244246

245247
// Reconcile Service

controllers/sandbox_controller_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ func TestReconcile(t *testing.T) {
566566
},
567567
Spec: corev1.PodSpec{
568568
Containers: []corev1.Container{{Name: "test-container"}},
569+
NodeName: "node-1",
569570
},
570571
Status: corev1.PodStatus{
571572
PodIPs: []corev1.PodIP{{IP: "10.244.0.5"}, {IP: "fd00::5"}},
@@ -589,6 +590,7 @@ func TestReconcile(t *testing.T) {
589590
ServiceFQDN: "sandbox-name.sandbox-ns.svc.cluster.local",
590591
LabelSelector: "agents.x-k8s.io/sandbox-name-hash=" + nameHash,
591592
PodIPs: []string{"10.244.0.5", "fd00::5"},
593+
NodeName: "node-1",
592594
Conditions: []metav1.Condition{
593595
{
594596
Type: "Ready",
@@ -634,6 +636,7 @@ func TestReconcile(t *testing.T) {
634636
},
635637
Spec: corev1.PodSpec{
636638
Containers: []corev1.Container{{Name: "test-container"}},
639+
NodeName: "node-2",
637640
},
638641
Status: corev1.PodStatus{
639642
PodIPs: []corev1.PodIP{{IP: "10.244.0.5"}},
@@ -654,6 +657,7 @@ func TestReconcile(t *testing.T) {
654657
wantStatus: sandboxv1beta1.SandboxStatus{
655658
LabelSelector: "agents.x-k8s.io/sandbox-name-hash=" + nameHash,
656659
PodIPs: []string{"10.244.0.5"},
660+
NodeName: "node-2",
657661
Conditions: []metav1.Condition{
658662
{
659663
Type: "Ready",

docs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ _Appears in:_
185185
| `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v/#condition-v1-meta) array_ | conditions defines the status conditions array | | Optional: \{\} <br /> |
186186
| `selector` _string_ | selector is the label selector for pods. | | Optional: \{\} <br /> |
187187
| `podIPs` _string array_ | podIPs are the IP addresses of the underlying pod.<br />A pod may have multiple IPs in dual-stack clusters. | | Optional: \{\} <br /> |
188+
| `nodeName` _string_ | NodeName is the name of the node where the underlying pod is scheduled. | | Optional: \{\} <br /> |
188189

189190

190191
#### ShutdownPolicy

extensions/controllers/queue/simple_sandbox_queue.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type SandboxKey types.NamespacedName
2828
type SandboxQueue interface {
2929
Add(warmPoolName string, item SandboxKey)
3030
Get(warmPoolName string) (SandboxKey, bool)
31+
GetWithStrategy(warmPoolName string, pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool)
3132
RemoveQueue(warmPoolName string)
3233
RemoveItem(warmPoolName string, item SandboxKey)
3334
}
@@ -58,6 +59,15 @@ func (s *SimpleSandboxQueue) Get(warmPoolName string) (SandboxKey, bool) {
5859
return q.(*synchronizedQueue).Pop()
5960
}
6061

62+
// GetWithStrategy pops an item from the specific warm pool's queue using a custom strategy.
63+
func (s *SimpleSandboxQueue) GetWithStrategy(warmPoolName string, pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool) {
64+
q, ok := s.queues.Load(warmPoolName)
65+
if !ok {
66+
return SandboxKey{}, false
67+
}
68+
return q.(*synchronizedQueue).PopWithStrategy(pick)
69+
}
70+
6171
// RemoveItem deletes a specific sandbox from a warm pool's queue.
6272
func (s *SimpleSandboxQueue) RemoveItem(warmPoolName string, item SandboxKey) {
6373
if q, ok := s.queues.Load(warmPoolName); ok {
@@ -139,8 +149,55 @@ func (q *synchronizedQueue) Pop() (SandboxKey, bool) {
139149
return item, true
140150
}
141151

152+
// PopWithStrategy applies the strategy function to pick an item from the queue,
153+
// removes it thread-safely, and returns it.
154+
func (q *synchronizedQueue) PopWithStrategy(pick func([]SandboxKey) (SandboxKey, bool)) (SandboxKey, bool) {
155+
for {
156+
q.mu.Lock()
157+
if len(q.items) == 0 {
158+
q.mu.Unlock()
159+
return SandboxKey{}, false
160+
}
161+
162+
// Snapshot the queue items
163+
snapshot := make([]SandboxKey, len(q.items))
164+
copy(snapshot, q.items)
165+
q.mu.Unlock()
166+
167+
key, ok := pick(snapshot)
168+
if !ok {
169+
return SandboxKey{}, false
170+
}
171+
172+
q.mu.Lock()
173+
// Verify the key is still present in the queue
174+
if _, exists := q.set[key]; !exists {
175+
// The picked key was concurrently popped by another goroutine.
176+
// Unlock and retry snapshot and pick.
177+
q.mu.Unlock()
178+
continue
179+
}
180+
181+
// Find the picked key in q.items and remove it
182+
for i, k := range q.items {
183+
if k == key {
184+
// Shift left and clear the tail slot
185+
last := len(q.items) - 1
186+
copy(q.items[i:], q.items[i+1:])
187+
q.items[last] = SandboxKey{}
188+
q.items = q.items[:last]
189+
break
190+
}
191+
}
192+
delete(q.set, key)
193+
q.mu.Unlock()
194+
195+
return key, true
196+
}
197+
}
198+
142199
// RemoveQueue completely deletes a warm pool's queue from the sync.Map
143-
// to prevent memory leaks when WarmPools are deleted.
200+
// to prevent memory leaks when SandboxTemplates or WarmPools are deleted.
144201
func (s *SimpleSandboxQueue) RemoveQueue(warmPoolName string) {
145202
s.queues.Delete(warmPoolName)
146203
}

extensions/controllers/queue/simple_sandbox_queue_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,50 @@ func TestSimpleSandboxQueue_RemoveQueue_MemoryLeakFix(t *testing.T) {
131131
t.Errorf("Expected queue to be completely removed, but it still existed")
132132
}
133133
}
134+
135+
func TestSimpleSandboxQueue_GetWithStrategy(t *testing.T) {
136+
q := NewSimpleSandboxQueue()
137+
hash := "template-hash-1"
138+
139+
key1 := SandboxKey{Namespace: "default", Name: "sb-1"}
140+
key2 := SandboxKey{Namespace: "default", Name: "sb-2"}
141+
key3 := SandboxKey{Namespace: "default", Name: "sb-3"}
142+
143+
q.Add(hash, key1)
144+
q.Add(hash, key2)
145+
q.Add(hash, key3)
146+
147+
// Custom strategy to pick key2 specifically
148+
pickKey2 := func(items []SandboxKey) (SandboxKey, bool) {
149+
for _, item := range items {
150+
if item.Name == "sb-2" {
151+
return item, true
152+
}
153+
}
154+
return SandboxKey{}, false
155+
}
156+
157+
// Pop with strategy
158+
got, ok := q.GetWithStrategy(hash, pickKey2)
159+
if !ok || got != key2 {
160+
t.Errorf("Expected to pick %v, got %v (ok: %v)", key2, got, ok)
161+
}
162+
163+
// First standard pop should be key1 (since key2 was removed)
164+
got1, _ := q.Get(hash)
165+
if got1 != key1 {
166+
t.Errorf("Expected first remaining item to be %v, got %v", key1, got1)
167+
}
168+
169+
// Second standard pop should be key3
170+
got3, _ := q.Get(hash)
171+
if got3 != key3 {
172+
t.Errorf("Expected second remaining item to be %v, got %v", key3, got3)
173+
}
174+
175+
// Queue should now be empty
176+
_, ok3 := q.Get(hash)
177+
if ok3 {
178+
t.Errorf("Expected queue to be empty, but got an item")
179+
}
180+
}

extensions/controllers/sandboxclaim_controller.go

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -611,16 +611,102 @@ func (r *SandboxClaimReconciler) getCandidate(ctx context.Context, claim *extens
611611
}
612612
}()
613613

614+
var sandboxList v1beta1.SandboxList
615+
if err := r.List(ctx, &sandboxList, client.InNamespace(claim.Namespace)); err != nil {
616+
logger.Error(err, "Failed to list sandboxes for smart selection node counting")
617+
}
618+
619+
nodeCounts := make(map[string]int)
620+
sbMap := make(map[string]*v1beta1.Sandbox)
621+
for i := range sandboxList.Items {
622+
sb := &sandboxList.Items[i]
623+
sbMap[sb.Name] = sb
624+
if _, isWarm := sb.Labels[warmPoolSandboxLabel]; !isWarm {
625+
if sb.Status.NodeName != "" {
626+
nodeCounts[sb.Status.NodeName]++
627+
}
628+
}
629+
}
630+
631+
pickSmart := func(keys []queue.SandboxKey) (queue.SandboxKey, bool) {
632+
var bestKey queue.SandboxKey
633+
var bestSandbox *v1beta1.Sandbox
634+
found := false
635+
636+
for _, key := range keys {
637+
// Upfront filter mismatching namespaces to avoid map lookup
638+
if key.Namespace != claim.Namespace {
639+
continue
640+
}
641+
642+
sb, exists := sbMap[key.Name]
643+
if !exists {
644+
continue
645+
}
646+
647+
if err := verifySandboxCandidate(sb, claim); err != nil {
648+
continue
649+
}
650+
651+
if !found {
652+
bestKey = key
653+
bestSandbox = sb
654+
found = true
655+
continue
656+
}
657+
658+
sbReady := isSandboxReady(sb)
659+
bestReady := isSandboxReady(bestSandbox)
660+
661+
if sbReady != bestReady {
662+
if sbReady {
663+
bestKey = key
664+
bestSandbox = sb
665+
}
666+
continue
667+
}
668+
669+
sbNode := sb.Status.NodeName
670+
bestNode := bestSandbox.Status.NodeName
671+
672+
if sbNode == "" && bestNode != "" {
673+
continue
674+
}
675+
if sbNode != "" && bestNode == "" {
676+
bestKey = key
677+
bestSandbox = sb
678+
continue
679+
}
680+
681+
if sbNode != "" && bestNode != "" && sbNode != bestNode {
682+
sbCount := nodeCounts[sbNode]
683+
bestCount := nodeCounts[bestNode]
684+
if sbCount < bestCount {
685+
bestKey = key
686+
bestSandbox = sb
687+
continue
688+
}
689+
if sbCount > bestCount {
690+
continue
691+
}
692+
}
693+
694+
if sb.CreationTimestamp.Before(&bestSandbox.CreationTimestamp) {
695+
bestKey = key
696+
bestSandbox = sb
697+
}
698+
}
699+
700+
return bestKey, found
701+
}
702+
614703
for {
615-
adoptedKey, ok := r.WarmSandboxQueue.Get(claim.Spec.WarmPoolRef.Name)
704+
adoptedKey, ok := r.WarmSandboxQueue.GetWithStrategy(claim.Spec.WarmPoolRef.Name, pickSmart)
616705
if !ok {
617706
return nil, queue.SandboxKey{}, nil
618707
}
619708

620-
// 1. Hand the Kubernetes client the empty bucket
621709
adopted := &v1beta1.Sandbox{}
622-
623-
// 2. Fetch from the Informer Cache
624710
err := r.Get(ctx, client.ObjectKey{Namespace: adoptedKey.Namespace, Name: adoptedKey.Name}, adopted)
625711
if err != nil {
626712
if k8errors.IsNotFound(err) {
@@ -635,8 +721,6 @@ func (r *SandboxClaimReconciler) getCandidate(ctx context.Context, claim *extens
635721

636722
if err := verifySandboxCandidate(adopted, claim); err != nil {
637723
logger.V(1).Info("sandbox candidate can't be adopted", "sandbox", adopted.Name, "warmPool", claim.Spec.WarmPoolRef.Name, "reason", err.Error())
638-
// If it's a good sandbox just in the wrong namespace,
639-
// add it to the skipped list so the defer block puts it back.
640724
if errors.Is(err, ErrCrossNamespaceAdoption) {
641725
skipped = append(skipped, adoptedKey)
642726
}

0 commit comments

Comments
 (0)