Skip to content

Commit cdd5075

Browse files
committed
feat: implement Balanced consolidation policy
Adds a new consolidationPolicy value, Balanced, that scores each consolidation move and rejects moves where the disruption outweighs the savings. Gated behind --feature-gates BalancedConsolidation=true. The scoring formula compares savings and disruption as fractions of NodePool totals: score = savings_fraction / disruption_fraction. A move is approved when score >= 1/consolidationThreshold (default 2). The scoring step is a filter inserted after scheduling feasibility and price comparison. It can only reject moves, never create them. If scoring has a bug that incorrectly approves, the move was already feasible and cost-saving. If it incorrectly rejects, the cluster is less optimized but not disrupted. API: - consolidationPolicy: Balanced (new enum value) - consolidationThreshold: 1-3 (default 2, requires Balanced) Implementation: - balanced.go: scoring formula, NodePool totals, candidate pre-filter, cross-NodePool move evaluation - Feature gate, API validation (CEL + runtime), defaulting - ShouldDisrupt accepts Balanced, sets ConsolidationPolicyUnsupported status condition when gate is disabled - Score-based candidate ranking for single-node consolidation - Events (ConsolidationApproved/Rejected on Node+NodeClaim for single-node, NodePool for multi-node) - Metrics (consolidation_score histogram, consolidation_moves_total counter) Tests (31 new): - 15 unit tests covering all RFC worked examples - 9 integration tests (NodePool totals, cross-pool, candidate price) - 3 feature gate tests - 5 validation + 4 defaulting tests - 4 score-based ranking tests - 1 status condition test See designs/balanced-consolidation.md (PR #2942) for the full RFC.
1 parent 4e80a7e commit cdd5075

26 files changed

Lines changed: 2840 additions & 1099 deletions

karpenter-triage

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 28d417564c9e463343ff641f14d1ec1b0c51389a

pkg/apis/crds/karpenter.sh_nodeclaims.yaml

Lines changed: 348 additions & 359 deletions
Large diffs are not rendered by default.

pkg/apis/crds/karpenter.sh_nodeoverlays.yaml

Lines changed: 197 additions & 203 deletions
Large diffs are not rendered by default.

pkg/apis/crds/karpenter.sh_nodepools.yaml

Lines changed: 498 additions & 502 deletions
Large diffs are not rendered by default.

pkg/apis/v1/nodepool.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type NodePoolSpec struct {
8080
Replicas *int64 `json:"replicas,omitempty"`
8181
}
8282

83+
// +kubebuilder:validation:XValidation:rule="!has(self.consolidationThreshold) || self.consolidationPolicy == 'Balanced'",message="consolidationThreshold is only valid when consolidationPolicy is Balanced"
8384
type Disruption struct {
8485
//nolint:kubeapilinter
8586
// ConsolidateAfter is the duration the controller will wait
@@ -96,10 +97,19 @@ type Disruption struct {
9697
// algorithm. This policy defaults to "WhenEmptyOrUnderutilized" if not specified
9798
// When replicas is set, ConsolidationPolicy is simply ignored
9899
// +kubebuilder:default:="WhenEmptyOrUnderutilized"
99-
// +kubebuilder:validation:Enum:={WhenEmpty,WhenEmptyOrUnderutilized}
100+
// +kubebuilder:validation:Enum:={WhenEmpty,WhenEmptyOrUnderutilized,Balanced}
100101
// +optional
101102
ConsolidationPolicy ConsolidationPolicy `json:"consolidationPolicy,omitempty"`
102103
//nolint:kubeapilinter
104+
// ConsolidationThreshold controls how much disruption one unit of savings can buy
105+
// when using the Balanced consolidation policy. A move is approved when
106+
// score >= 1/consolidationThreshold. Higher values approve more aggressively.
107+
// Only valid when consolidationPolicy is "Balanced". Defaults to 2.
108+
// +kubebuilder:validation:Minimum:=1
109+
// +kubebuilder:validation:Maximum:=3
110+
// +optional
111+
ConsolidationThreshold *int32 `json:"consolidationThreshold,omitempty"`
112+
//nolint:kubeapilinter
103113
// Budgets is a list of Budgets.
104114
// If there are multiple active budgets, Karpenter uses
105115
// the most restrictive value. If left undefined,
@@ -158,8 +168,12 @@ type ConsolidationPolicy string
158168
const (
159169
ConsolidationPolicyWhenEmpty ConsolidationPolicy = "WhenEmpty"
160170
ConsolidationPolicyWhenEmptyOrUnderutilized ConsolidationPolicy = "WhenEmptyOrUnderutilized"
171+
ConsolidationPolicyBalanced ConsolidationPolicy = "Balanced"
161172
)
162173

174+
// DefaultConsolidationThreshold is the default value for ConsolidationThreshold when using the Balanced policy.
175+
const DefaultConsolidationThreshold int32 = 2
176+
163177
// DisruptionReason defines valid reasons for disruption budgets.
164178
// +kubebuilder:validation:Enum={Underutilized,Empty,Drifted}
165179
type DisruptionReason string
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1
18+
19+
import (
20+
"context"
21+
"strings"
22+
"testing"
23+
)
24+
25+
func int32Ptr(v int32) *int32 { return &v }
26+
27+
// --- Test 2: Validation ---
28+
29+
// TestValidateConsolidationThreshold_RejectsWithoutBalanced verifies that
30+
// RuntimeValidate returns an error when consolidationThreshold is set but
31+
// consolidationPolicy is not Balanced.
32+
func TestValidateConsolidationThreshold_RejectsWithoutBalanced(t *testing.T) {
33+
np := &NodePool{}
34+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmptyOrUnderutilized
35+
np.Spec.Disruption.ConsolidationThreshold = int32Ptr(2)
36+
37+
err := np.RuntimeValidate(context.Background())
38+
if err == nil {
39+
t.Fatal("expected error when consolidationThreshold is set with non-Balanced policy")
40+
}
41+
if !strings.Contains(err.Error(), "consolidationThreshold is only valid when consolidationPolicy is Balanced") {
42+
t.Errorf("unexpected error message: %s", err.Error())
43+
}
44+
}
45+
46+
// TestValidateConsolidationThreshold_RejectsWithWhenEmpty verifies rejection
47+
// when policy is WhenEmpty.
48+
func TestValidateConsolidationThreshold_RejectsWithWhenEmpty(t *testing.T) {
49+
np := &NodePool{}
50+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmpty
51+
np.Spec.Disruption.ConsolidationThreshold = int32Ptr(2)
52+
53+
err := np.RuntimeValidate(context.Background())
54+
if err == nil {
55+
t.Fatal("expected error when consolidationThreshold is set with WhenEmpty policy")
56+
}
57+
}
58+
59+
// TestValidateConsolidationThreshold_PassesWithBalanced verifies that validation
60+
// passes when consolidationThreshold is set with consolidationPolicy: Balanced.
61+
func TestValidateConsolidationThreshold_PassesWithBalanced(t *testing.T) {
62+
np := &NodePool{}
63+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyBalanced
64+
np.Spec.Disruption.ConsolidationThreshold = int32Ptr(2)
65+
66+
err := np.Spec.Disruption.validateConsolidationThreshold()
67+
if err != nil {
68+
t.Errorf("expected no error for Balanced policy with threshold, got: %s", err)
69+
}
70+
}
71+
72+
// TestValidateConsolidationThreshold_PassesWithNilThreshold verifies that
73+
// validation passes when consolidationThreshold is nil regardless of policy.
74+
func TestValidateConsolidationThreshold_PassesWithNilThreshold(t *testing.T) {
75+
np := &NodePool{}
76+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyWhenEmptyOrUnderutilized
77+
np.Spec.Disruption.ConsolidationThreshold = nil
78+
79+
err := np.Spec.Disruption.validateConsolidationThreshold()
80+
if err != nil {
81+
t.Errorf("expected no error when threshold is nil, got: %s", err)
82+
}
83+
}
84+
85+
// TestValidateConsolidationThreshold_BoundaryValues tests the valid range [1, 3].
86+
// Note: The range [1, 3] is enforced by kubebuilder CEL validation at the API level.
87+
// The RuntimeValidate only checks the policy association. These tests verify the
88+
// policy check works for each valid value.
89+
func TestValidateConsolidationThreshold_BoundaryValues(t *testing.T) {
90+
tests := []struct {
91+
name string
92+
threshold int32
93+
policy ConsolidationPolicy
94+
wantErr bool
95+
}{
96+
{"threshold 1 with Balanced", 1, ConsolidationPolicyBalanced, false},
97+
{"threshold 2 with Balanced", 2, ConsolidationPolicyBalanced, false},
98+
{"threshold 3 with Balanced", 3, ConsolidationPolicyBalanced, false},
99+
{"threshold 1 with WhenEmptyOrUnderutilized", 1, ConsolidationPolicyWhenEmptyOrUnderutilized, true},
100+
{"threshold 3 with WhenEmptyOrUnderutilized", 3, ConsolidationPolicyWhenEmptyOrUnderutilized, true},
101+
}
102+
103+
for _, tc := range tests {
104+
t.Run(tc.name, func(t *testing.T) {
105+
d := &Disruption{
106+
ConsolidationPolicy: tc.policy,
107+
ConsolidationThreshold: int32Ptr(tc.threshold),
108+
}
109+
err := d.validateConsolidationThreshold()
110+
if tc.wantErr && err == nil {
111+
t.Errorf("expected error, got nil")
112+
}
113+
if !tc.wantErr && err != nil {
114+
t.Errorf("expected no error, got: %s", err)
115+
}
116+
})
117+
}
118+
}
119+
120+
// --- Test 3: Defaulting ---
121+
122+
// TestSetDefaults_BalancedPolicyDefaultsThresholdTo2 verifies that SetDefaults
123+
// sets consolidationThreshold to 2 when consolidationPolicy is Balanced and
124+
// threshold is nil.
125+
func TestSetDefaults_BalancedPolicyDefaultsThresholdTo2(t *testing.T) {
126+
np := &NodePool{}
127+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyBalanced
128+
np.Spec.Disruption.ConsolidationThreshold = nil
129+
130+
np.SetDefaults(context.Background())
131+
132+
if np.Spec.Disruption.ConsolidationThreshold == nil {
133+
t.Fatal("expected consolidationThreshold to be set, got nil")
134+
}
135+
if *np.Spec.Disruption.ConsolidationThreshold != DefaultConsolidationThreshold {
136+
t.Errorf("expected threshold %d, got %d", DefaultConsolidationThreshold, *np.Spec.Disruption.ConsolidationThreshold)
137+
}
138+
if *np.Spec.Disruption.ConsolidationThreshold != 2 {
139+
t.Errorf("expected threshold 2, got %d", *np.Spec.Disruption.ConsolidationThreshold)
140+
}
141+
}
142+
143+
// TestSetDefaults_NonBalancedPolicyDoesNotSetThreshold verifies that SetDefaults
144+
// does NOT set consolidationThreshold when policy is not Balanced.
145+
func TestSetDefaults_NonBalancedPolicyDoesNotSetThreshold(t *testing.T) {
146+
policies := []ConsolidationPolicy{
147+
ConsolidationPolicyWhenEmpty,
148+
ConsolidationPolicyWhenEmptyOrUnderutilized,
149+
}
150+
151+
for _, policy := range policies {
152+
t.Run(string(policy), func(t *testing.T) {
153+
np := &NodePool{}
154+
np.Spec.Disruption.ConsolidationPolicy = policy
155+
np.Spec.Disruption.ConsolidationThreshold = nil
156+
157+
np.SetDefaults(context.Background())
158+
159+
if np.Spec.Disruption.ConsolidationThreshold != nil {
160+
t.Errorf("expected threshold to remain nil for %s policy, got %d", policy, *np.Spec.Disruption.ConsolidationThreshold)
161+
}
162+
})
163+
}
164+
}
165+
166+
// TestSetDefaults_DoesNotOverrideExplicitThreshold verifies that SetDefaults
167+
// does NOT override an explicitly set threshold.
168+
func TestSetDefaults_DoesNotOverrideExplicitThreshold(t *testing.T) {
169+
np := &NodePool{}
170+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyBalanced
171+
np.Spec.Disruption.ConsolidationThreshold = int32Ptr(3)
172+
173+
np.SetDefaults(context.Background())
174+
175+
if *np.Spec.Disruption.ConsolidationThreshold != 3 {
176+
t.Errorf("expected threshold to remain 3, got %d", *np.Spec.Disruption.ConsolidationThreshold)
177+
}
178+
}
179+
180+
// TestSetDefaults_ExplicitThreshold1Preserved verifies threshold=1 is preserved.
181+
func TestSetDefaults_ExplicitThreshold1Preserved(t *testing.T) {
182+
np := &NodePool{}
183+
np.Spec.Disruption.ConsolidationPolicy = ConsolidationPolicyBalanced
184+
np.Spec.Disruption.ConsolidationThreshold = int32Ptr(1)
185+
186+
np.SetDefaults(context.Background())
187+
188+
if *np.Spec.Disruption.ConsolidationThreshold != 1 {
189+
t.Errorf("expected threshold to remain 1, got %d", *np.Spec.Disruption.ConsolidationThreshold)
190+
}
191+
}

pkg/apis/v1/nodepool_defaults.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,9 @@ import (
2121
)
2222

2323
// SetDefaults for the NodePool
24-
func (in *NodePool) SetDefaults(_ context.Context) {}
24+
func (in *NodePool) SetDefaults(_ context.Context) {
25+
if in.Spec.Disruption.ConsolidationPolicy == ConsolidationPolicyBalanced && in.Spec.Disruption.ConsolidationThreshold == nil {
26+
threshold := DefaultConsolidationThreshold
27+
in.Spec.Disruption.ConsolidationThreshold = &threshold
28+
}
29+
}

pkg/apis/v1/nodepool_status.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ const (
2929
ConditionTypeNodeClassReady = "NodeClassReady"
3030
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
3131
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
32+
// ConditionTypeConsolidationPolicyUnsupported indicates that the NodePool's consolidationPolicy
33+
// is set to Balanced but the BalancedConsolidation feature gate is disabled.
34+
ConditionTypeConsolidationPolicyUnsupported = "ConsolidationPolicyUnsupported"
3235
)
3336

3437
// NodePoolStatus defines the observed state of NodePool

pkg/apis/v1/nodepool_validation.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,17 @@ import (
2626

2727
// RuntimeValidate will be used to validate any part of the CRD that can not be validated at CRD creation
2828
func (in *NodePool) RuntimeValidate(ctx context.Context) (errs error) {
29-
errs = multierr.Combine(in.Spec.Template.validateLabels(), in.Spec.Template.Spec.validateTaints(), in.Spec.Template.Spec.validateRequirements(ctx), in.Spec.Template.validateRequirementsNodePoolKeyDoesNotExist())
29+
errs = multierr.Combine(in.Spec.Template.validateLabels(), in.Spec.Template.Spec.validateTaints(), in.Spec.Template.Spec.validateRequirements(ctx), in.Spec.Template.validateRequirementsNodePoolKeyDoesNotExist(), in.Spec.Disruption.validateConsolidationThreshold())
3030
return errs
3131
}
3232

33+
func (in *Disruption) validateConsolidationThreshold() error {
34+
if in.ConsolidationThreshold != nil && in.ConsolidationPolicy != ConsolidationPolicyBalanced {
35+
return fmt.Errorf("consolidationThreshold is only valid when consolidationPolicy is Balanced")
36+
}
37+
return nil
38+
}
39+
3340
func (in *NodeClaimTemplate) validateLabels() (errs error) {
3441
for key, value := range in.Labels {
3542
if key == NodePoolLabelKey {

pkg/apis/v1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)