Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
type SchedulerInformerMode string

const (
// SchedulerInformerDedicated makes the NodeResourceTopologyMatch plugin use the default framework informer.
// SchedulerInformerShared makes the NodeResourceTopologyMatch plugin use the default framework informer.
SchedulerInformerShared SchedulerInformerMode = "Shared"

// SchedulerInformerDedicated sets an additional separate informer just for the NodeResourceTopologyMatch plugin. Default.
Expand Down
75 changes: 66 additions & 9 deletions controllers/numaresourcesscheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -37,11 +38,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform"
k8swgmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
k8swgrbacupdate "github.com/k8stopologyawareschedwg/deployer/pkg/objectupdate/rbac"

nropv1 "github.com/openshift-kni/numaresources-operator/api/numaresourcesoperator/v1"
"github.com/openshift-kni/numaresources-operator/internal/api/annotations"
"github.com/openshift-kni/numaresources-operator/internal/platforminfo"
"github.com/openshift-kni/numaresources-operator/internal/relatedobjects"
"github.com/openshift-kni/numaresources-operator/pkg/apply"
"github.com/openshift-kni/numaresources-operator/pkg/hash"
Expand All @@ -61,6 +64,7 @@ type NUMAResourcesSchedulerReconciler struct {
SchedulerManifests schedmanifests.Manifests
Namespace string
AutodetectReplicas int
PlatformInfo platforminfo.PlatformInfo
}

//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles,verbs=*
Expand Down Expand Up @@ -98,9 +102,11 @@ func (r *NUMAResourcesSchedulerReconciler) Reconcile(ctx context.Context, req ct
return ctrl.Result{}, err
}

initialStatus := *instance.Status.DeepCopy()

if req.Name != objectnames.DefaultNUMAResourcesSchedulerCrName {
message := fmt.Sprintf("incorrect NUMAResourcesScheduler resource name: %s", instance.Name)
return ctrl.Result{}, r.updateStatus(ctx, instance, status.ConditionDegraded, status.ConditionTypeIncorrectNUMAResourcesSchedulerResourceName, message)
return ctrl.Result{}, r.updateStatus(ctx, initialStatus, instance, status.ConditionDegraded, status.ConditionTypeIncorrectNUMAResourcesSchedulerResourceName, message)
}

if annotations.IsPauseReconciliationEnabled(instance.Annotations) {
Expand All @@ -109,7 +115,7 @@ func (r *NUMAResourcesSchedulerReconciler) Reconcile(ctx context.Context, req ct
}

result, condition, err := r.reconcileResource(ctx, instance)
if err := r.updateStatus(ctx, instance, condition, status.ReasonFromError(err), status.MessageFromError(err)); err != nil {
if err := r.updateStatus(ctx, initialStatus, instance, condition, status.ReasonFromError(err), status.MessageFromError(err)); err != nil {
klog.InfoS("Failed to update numaresourcesscheduler status", "Desired condition", condition, "error", err)
}

Expand Down Expand Up @@ -184,6 +190,8 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex
klog.V(4).Info("SchedulerSync start")
defer klog.V(4).Info("SchedulerSync stop")

platformNormalize(&instance.Spec, r.PlatformInfo)

schedSpec := instance.Spec.Normalize()
cacheResyncPeriod := unpackAPIResyncPeriod(schedSpec.CacheResyncPeriod)
params := configParamsFromSchedSpec(schedSpec, cacheResyncPeriod, r.Namespace)
Expand All @@ -200,13 +208,15 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex
return nropv1.NUMAResourcesSchedulerStatus{}, err
}

schedStatus := nropv1.NUMAResourcesSchedulerStatus{
SchedulerName: schedSpec.SchedulerName,
CacheResyncPeriod: &metav1.Duration{
Duration: cacheResyncPeriod,
},
schedStatus := *instance.Status.DeepCopy()
schedStatus.SchedulerName = schedSpec.SchedulerName
schedStatus.CacheResyncPeriod = &metav1.Duration{
Duration: cacheResyncPeriod,
}

informerCondition := buildDedicatedInformerCondition(*instance, schedSpec)
schedStatus.Conditions = status.GetUpdatedSchedulerConditions(schedStatus.Conditions, informerCondition)

r.SchedulerManifests.Deployment.Spec.Replicas = r.computeSchedulerReplicas(schedSpec)
klog.V(4).InfoS("using scheduler replicas", "replicas", *r.SchedulerManifests.Deployment.Spec.Replicas)
// TODO: if replicas doesn't make sense (autodetect disabled and user set impossible value) then we
Expand Down Expand Up @@ -251,8 +261,55 @@ func (r *NUMAResourcesSchedulerReconciler) syncNUMASchedulerResources(ctx contex
return schedStatus, nil
}

func (r *NUMAResourcesSchedulerReconciler) updateStatus(ctx context.Context, sched *nropv1.NUMAResourcesScheduler, condition string, reason string, message string) error {
sched.Status.Conditions, _ = status.UpdateConditions(sched.Status.Conditions, condition, reason, message)
func platformNormalize(spec *nropv1.NUMAResourcesSchedulerSpec, platInfo platforminfo.PlatformInfo) {
if platInfo.Platform != platform.OpenShift && platInfo.Platform != platform.HyperShift {
return
}
if spec.SchedulerInformer != nil {
// assume user-provided value. Nothing to do.
klog.V(4).InfoS("SchedulerInformer explicit value", "Platform", platInfo.Platform, "PlatformVersion", platInfo.Version.String(), "SchedulerInformer", *spec.SchedulerInformer)
return
}
if !platInfo.Properties.PodResourcesListFilterActivePods {
// keep shared default for backward compatibility. TODO: review/switch default in 4.21
return
}
spec.SchedulerInformer = ptr.To(nropv1.SchedulerInformerShared)
klog.V(4).InfoS("SchedulerInformer default is overridden", "Platform", platInfo.Platform, "PlatformVersion", platInfo.Version.String(), "SchedulerInformer", *spec.SchedulerInformer)
}

func buildDedicatedInformerCondition(instance nropv1.NUMAResourcesScheduler, normalized nropv1.NUMAResourcesSchedulerSpec) metav1.Condition {
condition := metav1.Condition{
Type: status.ConditionDedicatedInformerActive,
Status: metav1.ConditionTrue,
ObservedGeneration: instance.ObjectMeta.Generation,
Reason: status.ConditionDedicatedInformerActive,
}

if *normalized.SchedulerInformer == nropv1.SchedulerInformerShared {
condition.Status = metav1.ConditionFalse
}

return condition
}

func (r *NUMAResourcesSchedulerReconciler) updateStatus(ctx context.Context, initialStatus nropv1.NUMAResourcesSchedulerStatus, sched *nropv1.NUMAResourcesScheduler, condition string, reason string, message string) error {
c := metav1.Condition{
Type: condition,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
}
sched.Status.Conditions = status.GetUpdatedSchedulerConditions(sched.Status.Conditions, c)

if !status.NUMAResourcesSchedulerNeedsUpdate(initialStatus, sched.Status) {
return nil
}

if status.EqualConditions(initialStatus.Conditions, sched.Status.Conditions) {
sched.Status.Conditions = initialStatus.Conditions
}

if err := r.Client.Status().Update(ctx, sched); err != nil {
return fmt.Errorf("could not update status for object %s: %w", client.ObjectKeyFromObject(sched), err)
}
Expand Down
179 changes: 179 additions & 0 deletions controllers/numaresourcesscheduler_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform"
depmanifests "github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
depobjupdate "github.com/k8stopologyawareschedwg/deployer/pkg/objectupdate"
nropv1 "github.com/openshift-kni/numaresources-operator/api/numaresourcesoperator/v1"
"github.com/openshift-kni/numaresources-operator/internal/api/annotations"
testobjs "github.com/openshift-kni/numaresources-operator/internal/objects"
"github.com/openshift-kni/numaresources-operator/internal/platforminfo"
"github.com/openshift-kni/numaresources-operator/pkg/hash"
nrosched "github.com/openshift-kni/numaresources-operator/pkg/numaresourcesscheduler"
schedmanifests "github.com/openshift-kni/numaresources-operator/pkg/numaresourcesscheduler/manifests/sched"
Expand Down Expand Up @@ -214,6 +217,67 @@ var _ = ginkgo.Describe("Test NUMAResourcesScheduler Reconcile", func() {
gomega.Expect(nrs.Status.CacheResyncPeriod.Seconds()).To(gomega.Equal(resyncPeriod.Seconds()))
})

ginkgo.Context("should reflect DedicatedInformerActive in status conditions", func() {
ginkgo.It("with default values", func() {
key := client.ObjectKeyFromObject(nrs)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

gomega.Expect(reconciler.Client.Get(context.TODO(), key, nrs)).To(gomega.Succeed())

c := getConditionByType(nrs.Status.Conditions, status.ConditionDedicatedInformerActive)

gomega.Expect(c).ToNot(gomega.BeNil())
gomega.Expect(c.Status).To(gomega.Equal(metav1.ConditionTrue))
})

ginkgo.It("with updated values - explicitly configured to Dedicated", func() {
nrs := nrs.DeepCopy()
nrs.Spec.SchedulerInformer = ptr.To(nropv1.SchedulerInformerDedicated)

gomega.Eventually(func() bool {
if err := reconciler.Client.Update(context.TODO(), nrs); err != nil {
klog.Warningf("failed to update the scheduler object; err: %v", err)
return false
}
return true
}, 30*time.Second, 5*time.Second).Should(gomega.BeTrue())

key := client.ObjectKeyFromObject(nrs)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(reconciler.Client.Get(context.TODO(), key, nrs)).To(gomega.Succeed())

c := getConditionByType(nrs.Status.Conditions, status.ConditionDedicatedInformerActive)

gomega.Expect(c).ToNot(gomega.BeNil())
gomega.Expect(c.Status).To(gomega.Equal(metav1.ConditionTrue))
})

ginkgo.It("with updated values - explicitly configured to Shared", func() {
nrs := nrs.DeepCopy()
nrs.Spec.SchedulerInformer = ptr.To(nropv1.SchedulerInformerShared)

gomega.Eventually(func() bool {
if err := reconciler.Client.Update(context.TODO(), nrs); err != nil {
klog.Warningf("failed to update the scheduler object; err: %v", err)
return false
}
return true
}, 30*time.Second, 5*time.Second).Should(gomega.BeTrue())

key := client.ObjectKeyFromObject(nrs)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(reconciler.Client.Get(context.TODO(), key, nrs)).To(gomega.Succeed())

c := getConditionByType(nrs.Status.Conditions, status.ConditionDedicatedInformerActive)

gomega.Expect(c).ToNot(gomega.BeNil())
gomega.Expect(c.Status).To(gomega.Equal(metav1.ConditionFalse))
})
})

ginkgo.It("should have the correct priority class", func() {
key := client.ObjectKeyFromObject(nrs)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
Expand Down Expand Up @@ -697,6 +761,121 @@ var _ = ginkgo.Describe("Test NUMAResourcesScheduler Reconcile", func() {
gomega.Expect(dp).To(HaveTheSameResourceRequirements(expectedRR))
})
})

ginkgo.Context("with kubelet PodResourcesAPI listing active pods by default", func() {
var nrs *nropv1.NUMAResourcesScheduler
var reconciler *NUMAResourcesSchedulerReconciler

ginkgo.When("kubelet fix is enabled", func() {
fixedVersion, _ := platform.ParseVersion("4.18.23")
unfixedVersion, _ := platform.ParseVersion("4.18.0") // can't (and we must not even if we can) rewrite history
futureFixedVersionZstream, _ := platform.ParseVersion("4.18.26") // we must never regress
futureFixedVersion, _ := platform.ParseVersion("4.21.0") // we must never regress

ginkgo.DescribeTable("should configure by default the informerMode to the expected when field is not set", func(reconcilerPlatInfo platforminfo.PlatformInfo, expectedInformer string) {
var err error
nrs = testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second)
reconciler, err = NewFakeNUMAResourcesSchedulerReconciler(nrs)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

reconciler.PlatformInfo = reconcilerPlatInfo

key := client.ObjectKeyFromObject(nrs)
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

expectCacheParams(reconciler.Client, depmanifests.CacheResyncAutodetect, depmanifests.CacheResyncOnlyExclusiveResources, expectedInformer)

expectedDedicatedActiveStatus := metav1.ConditionTrue
if expectedInformer == depmanifests.CacheInformerShared {
expectedDedicatedActiveStatus = metav1.ConditionFalse
}

gomega.Expect(reconciler.Client.Get(context.TODO(), key, nrs)).To(gomega.Succeed())
c := getConditionByType(nrs.Status.Conditions, status.ConditionDedicatedInformerActive)

gomega.Expect(c).ToNot(gomega.BeNil())
gomega.Expect(c.Status).To(gomega.Equal(expectedDedicatedActiveStatus))
},
ginkgo.Entry("with fixed Openshift the default informer is Shared", platforminfo.New(platform.OpenShift, fixedVersion), depmanifests.CacheInformerShared),
ginkgo.Entry("with fixed Hypershift the default informer is Shared", platforminfo.New(platform.HyperShift, fixedVersion), depmanifests.CacheInformerShared),
ginkgo.Entry("with unfixed platform the default informer is Dedicated (unchanged)", platforminfo.New(platform.OpenShift, unfixedVersion), depmanifests.CacheInformerDedicated),
ginkgo.Entry("with unfixed platform the default informer is Dedicated (unchanged)", platforminfo.New(platform.HyperShift, unfixedVersion), depmanifests.CacheInformerDedicated),
ginkgo.Entry("with fixed Openshift the default informer is Shared", platforminfo.New(platform.OpenShift, futureFixedVersion), depmanifests.CacheInformerShared),
ginkgo.Entry("with fixed Hypershift the default informer is Shared", platforminfo.New(platform.HyperShift, futureFixedVersion), depmanifests.CacheInformerShared),
ginkgo.Entry("with fixed Openshift the default informer is Shared", platforminfo.New(platform.OpenShift, futureFixedVersionZstream), depmanifests.CacheInformerShared),
ginkgo.Entry("with fixed Hypershift the default informer is Shared", platforminfo.New(platform.HyperShift, futureFixedVersionZstream), depmanifests.CacheInformerShared),
ginkgo.Entry("with unknown platform the default informer is Dedicated (unchanged)", platforminfo.PlatformInfo{}, depmanifests.CacheInformerDedicated),
)

ginkgo.DescribeTable("should preserve informerMode value if set", func(reconcilerPlatInfo platforminfo.PlatformInfo) {
var err error
nrs = testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second)
infMode := nropv1.SchedulerInformerDedicated
nrs.Spec.SchedulerInformer = &infMode
reconciler, err = NewFakeNUMAResourcesSchedulerReconciler(nrs)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

reconciler.PlatformInfo = reconcilerPlatInfo

key := client.ObjectKeyFromObject(nrs)
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
expectCacheParams(reconciler.Client, depmanifests.CacheResyncAutodetect, depmanifests.CacheResyncOnlyExclusiveResources, string(infMode))
},
ginkgo.Entry("with Openshift", platforminfo.PlatformInfo{
Platform: platform.OpenShift,
Version: fixedVersion,
}),
ginkgo.Entry("with Hypershift", platforminfo.PlatformInfo{
Platform: platform.HyperShift,
Version: fixedVersion,
}),
ginkgo.Entry("with unknown platform", platforminfo.PlatformInfo{}))

ginkgo.DescribeTable("should allow to update the informerMode to be Dedicated after an overridden default", func(reconcilerPlatInfo platforminfo.PlatformInfo) {
var err error
nrs = testobjs.NewNUMAResourcesScheduler("numaresourcesscheduler", "some/url:latest", testSchedulerName, 11*time.Second)
reconciler, err = NewFakeNUMAResourcesSchedulerReconciler(nrs)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

reconciler.PlatformInfo = reconcilerPlatInfo

key := client.ObjectKeyFromObject(nrs)
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// intentionally skip checking default value

// should query the object after reconcile because the defaults are overridden
gomega.Expect(reconciler.Client.Get(context.TODO(), key, nrs)).ToNot(gomega.HaveOccurred())

nrsUpdated := nrs.DeepCopy()
informerMode := nropv1.SchedulerInformerDedicated
nrsUpdated.Spec.SchedulerInformer = &informerMode
gomega.Eventually(func() bool {
if err := reconciler.Client.Update(context.TODO(), nrsUpdated); err != nil {
klog.Warningf("failed to update the scheduler object; err: %v", err)
return false
}
return true
}, 30*time.Second, 5*time.Second).Should(gomega.BeTrue())

_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: key})
gomega.Expect(err).ToNot(gomega.HaveOccurred())

expectCacheParams(reconciler.Client, depmanifests.CacheResyncAutodetect, depmanifests.CacheResyncOnlyExclusiveResources, string(informerMode))
},
ginkgo.Entry("with Openshift", platforminfo.PlatformInfo{
Platform: platform.OpenShift,
Version: fixedVersion,
}),
ginkgo.Entry("with Hypershift", platforminfo.PlatformInfo{
Platform: platform.HyperShift,
Version: fixedVersion,
}))
})
})
})

func HaveTheSameResourceRequirements(expectedRR corev1.ResourceRequirements) types.GomegaMatcher {
Expand Down
Loading