Skip to content

Dynamically adjust slack quota #212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- kueue.x-k8s.io
resources:
- clusterqueues
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- kueue.x-k8s.io
resources:
Expand Down
97 changes: 76 additions & 21 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package appwrapper

import (
"context"
"reflect"
"sync"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"github.com/project-codeflare/appwrapper/pkg/config"
)
Expand All @@ -40,30 +43,26 @@ type NodeHealthMonitor struct {
}

var (
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]sets.Set[string])
// unhealthyNodes is a mapping from Node names to a set of resource quantities that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]map[string]*resource.Quantity)
unhealthyNodesMutex sync.RWMutex
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;create;update;patch;delete

func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &metav1.PartialObjectMetadata{}
node.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "Node",
})
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
}

flaggedResources := make(sets.Set[string])
flaggedResources := make(map[string]*resource.Quantity)
for key, value := range node.GetLabels() {
for resource, apLabels := range r.Config.Autopilot.ResourceUnhealthyConfig {
for resourceName, apLabels := range r.Config.Autopilot.ResourceUnhealthyConfig {
if apValue, ok := apLabels[key]; ok && apValue == value {
flaggedResources.Insert(resource)
flaggedResources[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
}
}
}
Expand All @@ -74,30 +73,86 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
if len(flaggedResources) == 0 {
delete(unhealthyNodes, node.GetName())
nodeChanged = true
} else if !priorEntry.Equal(flaggedResources) {
} else if !reflect.DeepEqual(priorEntry, flaggedResources) {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
} else if len(flaggedResources) > 0 {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
numUnhealthy := len(unhealthyNodes)
unhealthyNodesMutex.Unlock() // END CRITICAL SECTION

// Unsynchronized reads of unhealthyNodes below are safe because this method
// is the only writer to the map and the controller runtime is configured to
// not allow concurrent execution of this method.

if nodeChanged {
// This unsynchronized read of unhealthyNodes for logging purposes is safe because this method
// is the only writer to the map and the controller runtime is configured to not allow concurrent execution of this method.
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", numUnhealthy, "Unhealthy Resource Details", unhealthyNodes)
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", len(unhealthyNodes), "Unhealthy Resource Details", unhealthyNodes)
}

// update lending limits on slack quota if configured

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
}

// get slack quota
cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // give up if slack quota is not defined
}
return ctrl.Result{}, err
}

// compute unhealthy resource totals
missingQuantities := map[string]*resource.Quantity{}
for _, quantities := range unhealthyNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if missingQuantities[resourceName] == nil {
missingQuantities[resourceName] = ptr.To(*quantity)
} else {
missingQuantities[resourceName].Add(*quantity)
}
}
}
}

return ctrl.Result{}, nil
// enforce lending limits on 1st flavor of 1st resource group
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if missingQuantity := missingQuantities[quota.Name.String()]; missingQuantity != nil {
if quota.NominalQuota.Cmp(*missingQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*missingQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
}
if quota.LendingLimit == nil && lendingLimit != nil ||
quota.LendingLimit != nil && lendingLimit == nil ||
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
limitsChanged = true
resources[i].LendingLimit = lendingLimit
}
}

// update lending limits
var err error
if limitsChanged {
err = r.Update(ctx, cq)
}
return ctrl.Result{}, err
}

// SetupWithManager sets up the controller with the Manager.
func (r *NodeHealthMonitor) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
WatchesMetadata(&v1.Node{}, &handler.EnqueueRequestForObject{}).
Watches(&v1.Node{}, &handler.EnqueueRequestForObject{}).
Named("NodeMonitor").
Complete(r)
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type AppWrapperConfig struct {
FaultTolerance *FaultToleranceConfig `json:"faultTolerance,omitempty"`
SchedulerName string `json:"schedulerName,omitempty"`
DefaultQueueName string `json:"defaultQueueName,omitempty"`
SlackQueueName string `json:"slackQueueName,omitempty"`
}

type KueueJobReconcillerConfig struct {
Expand Down