diff --git a/internal/podlist/conv.go b/internal/podlist/conv.go new file mode 100644 index 000000000..aef928954 --- /dev/null +++ b/internal/podlist/conv.go @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package podlist + +import corev1 "k8s.io/api/core/v1" + +// ToPods bridges packages podlist and wait. +// podlist package reasons in terms of corev1.PodList{}.Items: []corev1.Pod +// wait package reasons in terms of []*corev1.Pod (note the pointer) +func ToPods(pods []corev1.Pod) []*corev1.Pod { + ret := make([]*corev1.Pod, 0, len(pods)) + for idx := range pods { + ret = append(ret, &pods[idx]) + } + return ret +} diff --git a/internal/wait/pod.go b/internal/wait/pod.go index d45e91394..1008fe272 100644 --- a/internal/wait/pod.go +++ b/internal/wait/pod.go @@ -83,7 +83,7 @@ func (wt Waiter) ForPodDeleted(ctx context.Context, podNamespace, podName string }) } -func (wt Waiter) ForPodListAllRunning(ctx context.Context, pods []*corev1.Pod) ([]*corev1.Pod, []*corev1.Pod) { +func (wt Waiter) ForPodsAllRunning(ctx context.Context, pods []*corev1.Pod) ([]*corev1.Pod, []*corev1.Pod) { var lock sync.Mutex var failed []*corev1.Pod var updated []*corev1.Pod @@ -112,21 +112,19 @@ func (wt Waiter) ForPodListAllRunning(ctx context.Context, pods []*corev1.Pod) ( return failed, updated } -func (wt Waiter) ForPodListAllDeleted(ctx context.Context, pods []corev1.Pod) error { +func (wt Waiter) ForPodsAllDeleted(ctx context.Context, pods []*corev1.Pod) error { if len(pods) == 0 { - klog.Infof("ForPodListAllDeleted called with an empty list of pods. Nothing to do.") + klog.Infof("ForPodsAllDeleted called with an empty slice of pods. Nothing to do.") return nil } klog.Infof("Waiting for %d pod(s) to be deleted.", len(pods)) var eg errgroup.Group - for idx := range pods { - pod := pods[idx] - podKey := client.ObjectKeyFromObject(&pod).String() - + for _, pod := range pods { + podKey := client.ObjectKeyFromObject(pod).String() eg.Go(func() error { - klog.Infof("Goroutine started: waiting for pod %s to be deleted.", podKey) + klog.Infof("Waiting for pod %s to be deleted.", podKey) err := wt.ForPodDeleted(ctx, pod.Namespace, pod.Name) if err != nil { klog.Warningf("Failed to confirm deletion for pod %s: %v", podKey, err) diff --git a/test/e2e/serial/tests/non_regression_fundamentals.go b/test/e2e/serial/tests/non_regression_fundamentals.go index 173a53dcd..29bb1ac8d 100644 --- a/test/e2e/serial/tests/non_regression_fundamentals.go +++ b/test/e2e/serial/tests/non_regression_fundamentals.go @@ -65,7 +65,11 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser }) Context("using the NUMA-aware scheduler with NRT data", func() { - var cpusPerPod int64 = 2 // must be even. Must be >= 2 + var cpusPerPod int64 + + BeforeEach(func() { + cpusPerPod = 2 // must be even. Must be >= 2 + }) DescribeTable("[node1] against a single node", Label("node1"), // the ourpose of this test is to send a burst of pods towards a node. Each pod must require resources in such a way @@ -116,7 +120,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser // CAUTION: still assuming all NUMA zones are equal across all nodes numPods := int(cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!) - klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusVal, maxAllocPerNUMAVal) + klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusPerPod, maxAllocPerNUMAVal) var testPods []*corev1.Pod for idx := 0; idx < numPods; idx++ { @@ -135,7 +139,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser testPods = append(testPods, testPod) } - failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(context.TODO(), testPods) for _, failedPod := range failedPods { _ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name) @@ -207,11 +211,13 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser resQty := e2enrt.GetMaxAllocatableResourceNumaLevel(*nrtInfo, corev1.ResourceCPU) resVal, ok := resQty.AsInt64() Expect(ok).To(BeTrue(), "cannot convert allocatable CPU resource as int") + klog.Infof("total available CPU: %d nodes x %d NUMA zone per node x %d CPUs per node = %dx%dx%d = %d", + len(nrts), len(nrtInfo.Zones), resVal, len(nrts), len(nrtInfo.Zones), resVal, len(nrts)*len(nrtInfo.Zones)*int(resVal)) cpusVal := (10 * resVal) / 8 numPods := int(int64(len(nrts)) * cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!) - klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusVal, resVal) + klog.Infof("creating %d pods consuming %d cpus each total %d CPUs consumed (found %d per NUMA zone)", numPods, cpusPerPod, numPods*int(cpusPerPod), resVal) var testPods []*corev1.Pod for idx := 0; idx < numPods; idx++ { @@ -231,7 +237,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser testPods = append(testPods, testPod) } - failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(context.TODO(), testPods) for _, failedPod := range failedPods { _ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name) diff --git a/test/e2e/serial/tests/scheduler_cache.go b/test/e2e/serial/tests/scheduler_cache.go index 0c9b5e19d..2d4d3f3ec 100644 --- a/test/e2e/serial/tests/scheduler_cache.go +++ b/test/e2e/serial/tests/scheduler_cache.go @@ -200,7 +200,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac // very generous timeout here. It's hard and racy to check we had 2 pods pending (expected phased scheduling), // but that would be the most correct and stricter testing. - failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodsAllRunning(context.TODO(), testPods) dumpFailedPodInfo(fxt, failedPods) Expect(failedPods).To(BeEmpty(), "unexpected failed pods: %q", accumulatePodNamespacedNames(failedPods)) @@ -295,7 +295,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac By("waiting for the test pods to go running") // even more generous timeout here. We need to tolerate more reconciliation time because of the interference startTime := time.Now() - failedPods, updatedPods := wait.With(fxt.Client).Interval(5*time.Second).Timeout(5*time.Minute).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Interval(5*time.Second).Timeout(5*time.Minute).ForPodsAllRunning(context.TODO(), testPods) dumpFailedPodInfo(fxt, failedPods) elapsed := time.Since(startTime) klog.Infof("test pods (payload + interference) gone running in %v", elapsed) @@ -429,7 +429,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac // this is a slight abuse. We want to wait for hostsRequired < desiredPods to be running. Other pod(s) must be pending. // So we wait a bit too much unnecessarily, but wetake this chance to ensure the pod(s) which are supposed to be pending // stay pending at least up until timeout - failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodsAllRunning(context.TODO(), testPods) Expect(updatedPods).To(HaveLen(hostsRequired)) Expect(failedPods).To(HaveLen(expectedPending)) Expect(len(updatedPods) + len(failedPods)).To(Equal(desiredPods)) @@ -549,7 +549,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac // this is a slight abuse. We want to wait for hostsRequired < desiredPods to be running. Other pod(s) must be pending. // So we wait a bit too much unnecessarily, but wetake this chance to ensure the pod(s) which are supposed to be pending // stay pending at least up until timeout - failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodListAllRunning(context.TODO(), testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodsAllRunning(context.TODO(), testPods) Expect(updatedPods).To(HaveLen(hostsRequired)) Expect(failedPods).To(HaveLen(expectedPending)) @@ -596,7 +596,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac // here we really need a quite long timeout. Still 300s is a bit of overshot (expected so). // The reason to be supercareful here is the potentially long interplay between // NRT updater, resync loop, scheduler retry loop. - failedPods, updatedPods = wait.With(fxt.Client).Timeout(5*time.Minute).ForPodListAllRunning(context.TODO(), expectedRunningPods) + failedPods, updatedPods = wait.With(fxt.Client).Timeout(5*time.Minute).ForPodsAllRunning(context.TODO(), expectedRunningPods) dumpFailedPodInfo(fxt, failedPods) Expect(updatedPods).To(HaveLen(hostsRequired)) Expect(failedPods).To(BeEmpty()) diff --git a/test/e2e/serial/tests/scheduler_cache_stall.go b/test/e2e/serial/tests/scheduler_cache_stall.go index ada6ba78b..7a855c7fa 100644 --- a/test/e2e/serial/tests/scheduler_cache_stall.go +++ b/test/e2e/serial/tests/scheduler_cache_stall.go @@ -19,6 +19,7 @@ package tests import ( "context" "fmt" + "strconv" "time" . "github.com/onsi/ginkgo/v2" @@ -47,6 +48,8 @@ import ( "github.com/openshift-kni/numaresources-operator/test/internal/objects" ) +type makePodFunc func(ns string, idx int) *corev1.Pod + type setupJobFunc func(job *batchv1.Job) var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("scheduler", "cache", "stall"), Label("feature:cache", "feature:stall"), func() { @@ -114,6 +117,335 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche klog.Infof("using MCP %q - refresh period %v", mcpName, refreshPeriod) }) + When("there are non-restartable pods in the cluster", Label("generic", "restartPolicy", label.Tier1), func() { + var expectedPodsPerNode int + var hostsRequired int + var NUMAZonesRequired int + var cpusPerPod int64 + var noisePods []*corev1.Pod + + BeforeEach(func(ctx context.Context) { + hostsRequired = 2 + NUMAZonesRequired = 2 + cpusPerPod = 2 // must be even. Must be >= 2 + + By(fmt.Sprintf("filtering available nodes with at least %d NUMA zones", NUMAZonesRequired)) + nrtCandidates = e2enrt.FilterZoneCountEqual(nrtList.Items, NUMAZonesRequired) + if len(nrtCandidates) < hostsRequired { + e2efixture.Skipf(fxt, "not enough nodes with %d NUMA Zones: found %d", NUMAZonesRequired, len(nrtCandidates)) + } + klog.Infof("Found %d nodes with %d NUMA zones", len(nrtCandidates), NUMAZonesRequired) + + noisePods = []*corev1.Pod{} + }) + + AfterEach(func(ctx context.Context) { + Expect(wait.With(fxt.Client).Interval(3*time.Second).Timeout(30*time.Second).ForPodsAllDeleted(ctx, noisePods)).To(Succeed()) + }) + + DescribeTable("should be able to schedule pods with no stalls", Label("nodeAll"), + // like non-regression tests, but with jobs present + func(ctx context.Context, makeNoisePod makePodFunc) { + expectedPodsPerNode = 2 // anything >= 1 should be fine + for nidx, nrt := range nrtCandidates { + for idx := range expectedPodsPerNode { + nPod := makeNoisePod(fxt.Namespace.Name, nidx*1000+idx) + nPod.Spec.NodeName = nrt.Name + Expect(fxt.Client.Create(ctx, nPod)).To(Succeed()) // will be removed by the fixture + dumpPodResources(nPod) + noisePods = append(noisePods, nPod) + } + } + var failedNoisePods []*corev1.Pod + failedNoisePods, noisePods = wait.With(fxt.Client).Timeout(3*time.Minute).ForPodsAllRunning(ctx, noisePods) + Expect(failedNoisePods).To(BeEmpty(), "pods failed to go running: %s", accumulatePodNamespacedNames(failedNoisePods)) + // ensure foreign pods are reported + e2efixture.MustSettleNRT(fxt) + + timeout := nroSchedObj.Status.CacheResyncPeriod.Round(time.Second) * 10 + klog.Infof("pod running timeout: %v", timeout) + + nrts := e2enrt.FilterZoneCountEqual(nrtList.Items, 2) + if len(nrts) < 1 { + e2efixture.Skip(fxt, "Not enough nodes found with at least 2 NUMA zones") + } + + // CAUTION here: we assume all worker node are identical, so to estimate + // the available resources we pick one at random and we use it as reference + nodesNames := e2enrt.AccumulateNames(nrts) + referenceNodeName, ok := e2efixture.PopNodeName(nodesNames) + Expect(ok).To(BeTrue()) + + klog.Infof("selected reference node name: %q", referenceNodeName) + + nrtInfo, err := e2enrt.FindFromList(nrts, referenceNodeName) + Expect(err).ToNot(HaveOccurred()) + + // we still are in the serial suite, so we assume; + // - even number of CPUs per NUMA zone + // - unloaded node - so available == allocatable + // - identical NUMA zones + // - at most 1/4 of the node resources took by baseload (!!!) + // we use cpus as unit because it's the easiest thing to consider + resQty := e2enrt.GetMaxAllocatableResourceNumaLevel(*nrtInfo, corev1.ResourceCPU) + resVal, ok := resQty.AsInt64() + Expect(ok).To(BeTrue(), "cannot convert allocatable CPU resource as int") + + // this is "a little more" than the max allocatable quantity, to make sure we saturate a NUMA zone, + // triggering the pessimistic overallocation and making sure the scheduler will have to wait. + // the actual ratio is not that important (could have been 11/10 possibly) as long as it triggers + // this condition. + klog.Infof("total available CPU: %d nodes x %d NUMA zone per node x %d CPUs per node = %dx%dx%d = %d", + len(nrts), len(nrtInfo.Zones), resVal, len(nrts), len(nrtInfo.Zones), resVal, len(nrts)*len(nrtInfo.Zones)*int(resVal)) + + cpusVal := (10 * resVal) / 8 + numPods := int(int64(len(nrts)) * cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!) + + By("running the test workload pods") + klog.Infof("creating %d pods consuming %d cpus each total %d CPUs consumed (found %d per NUMA zone)", numPods, cpusPerPod, numPods*int(cpusPerPod), resVal) + + var testPods []*corev1.Pod + for idx := 0; idx < numPods; idx++ { + testPod := objects.NewTestPodPause(fxt.Namespace.Name, fmt.Sprintf("testpod-schedstall-%d", idx)) + testPod.Spec.SchedulerName = serialconfig.Config.SchedulerName + testPod.Spec.Containers[0].Resources.Limits = corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(cpusPerPod, resource.DecimalSI), + corev1.ResourceMemory: resource.MustParse("64Mi"), + } + + testPod.Spec.NodeSelector = map[string]string{ + serialconfig.MultiNUMALabel: "2", + } + + klog.Infof("creating pod %s/%s", testPod.Namespace, testPod.Name) + err = fxt.Client.Create(ctx, testPod) + Expect(err).ToNot(HaveOccurred()) + + dumpPodResources(testPod) + testPods = append(testPods, testPod) + } + + By("ensuring the test workload pods are running") + failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(ctx, testPods) + + for _, failedPod := range failedPods { + _ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name) + } + Expect(failedPods).To(BeEmpty(), "pods failed to go running: %s", accumulatePodNamespacedNames(failedPods)) + + By("ensuring the test workload pods are scheduled as expected") + for _, updatedPod := range updatedPods { + schedOK, err := nrosched.CheckPODWasScheduledWith(fxt.K8sClient, updatedPod.Namespace, updatedPod.Name, serialconfig.Config.SchedulerName) + Expect(err).ToNot(HaveOccurred()) + Expect(schedOK).To(BeTrue(), "pod %s/%s not scheduled with expected scheduler %s", updatedPod.Namespace, updatedPod.Name, serialconfig.Config.SchedulerName) + } + }, + Entry("with non-restartable pods with guaranteed init and app containers", context.TODO(), func(podNamespace string, idx int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + GenerateName: "generic-pause-pod-", + Labels: map[string]string{ + "test-pod-index": strconv.FormatInt(int64(idx), 10), + }, + }, + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "generic-init-cnt", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "generic-main-cnt", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + }), + Entry("with non-restartable pods with guaranteed init and long running app containers", context.TODO(), func(podNamespace string, idx int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + GenerateName: "generic-pause-pod-", + Labels: map[string]string{ + "test-pod-index": strconv.FormatInt(int64(idx), 10), + }, + }, + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "generic-init-cnt", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "generic-main-cnt", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"42d"}, // "forever" in our test timescale + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + }), + Entry("with non-restartable pods with guaranteed pod with partially terminated app containers", context.TODO(), func(podNamespace string, idx int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + GenerateName: "generic-pause-pod-", + Labels: map[string]string{ + "test-pod-index": strconv.FormatInt(int64(idx), 10), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "generic-main-cnt-run-1", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"42d"}, // "forever" in our test timescale + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + { + Name: "generic-main-cnt-run-2", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + }), + Entry("with non-restartable pods with guaranteed pod with partially terminated app containers", context.TODO(), func(podNamespace string, idx int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + GenerateName: "generic-pause-pod-", + Labels: map[string]string{ + "test-pod-index": strconv.FormatInt(int64(idx), 10), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "generic-main-cnt-run-1", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"42d"}, // "forever" in our test timescale + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + { + Name: "generic-main-cnt-run-2", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + }), + Entry("with restartable pods with guaranteed pod with partially terminated app containers", context.TODO(), func(podNamespace string, idx int) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: podNamespace, + GenerateName: "generic-pause-pod-", + Labels: map[string]string{ + "test-pod-index": strconv.FormatInt(int64(idx), 10), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "generic-main-cnt-run-1", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"42d"}, // "forever" in our test timescale + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + { + Name: "generic-main-cnt-run-2", + Image: images.GetPauseImage(), + Command: []string{"/bin/sleep"}, + Args: []string{"1s"}, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + }, + } + }), + ) + }) + When("there are jobs in the cluster", Label("job", "generic", label.Tier0), func() { var idleJob *batchv1.Job var hostsRequired int @@ -218,7 +550,7 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche // very generous timeout here. It's hard and racy to check we had 2 pods pending (expected phased scheduling), // but that would be the most correct and stricter testing. - failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodListAllRunning(ctx, testPods) + failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodsAllRunning(ctx, testPods) if len(failedPods) > 0 { nrtListFailed, _ := e2enrt.GetUpdated(fxt.Client, nrtv1alpha2.NodeResourceTopologyList{}, time.Minute) klog.Infof("%s", intnrt.ListToString(nrtListFailed.Items, "post failure")) @@ -235,10 +567,10 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche Expect(schedOK).To(BeTrue(), "pod %s/%s not scheduled with expected scheduler %s", updatedPod.Namespace, updatedPod.Name, serialconfig.Config.SchedulerName) } }, - Entry("vs best-effort pods", Label(label.Tier1), func(job *batchv1.Job) { + Entry("vs best-effort pods", func(job *batchv1.Job) { klog.Infof("Creating a job whose containers have requests=none") }), - Entry("vs burstable pods", Label(label.Tier1), func(job *batchv1.Job) { + Entry("vs burstable pods", func(job *batchv1.Job) { jobRequiredRes := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("100m"), corev1.ResourceMemory: resource.MustParse("256Mi"), @@ -254,7 +586,7 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche // GAP: pinnable cpu (but not memory) // however with the recommended config, we can't have pinnable CPUs without pinnable memory; // we would need cpumanager policy=static and memorymanager policy=none, which we don't recommend. - Entry("vs guaranteed pods with pinnable memory", Label(label.Tier1), func(job *batchv1.Job) { + Entry("vs guaranteed pods with pinnable memory", func(job *batchv1.Job) { jobRequiredRes := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("100m"), corev1.ResourceMemory: resource.MustParse("256Mi"), @@ -267,7 +599,7 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche } klog.Infof("Creating a job whose containers have limits=%q", e2ereslist.ToString(jobRequiredRes)) }), - Entry("vs guaranteed pods with pinnable memory and CPU", Label(label.Tier1), func(job *batchv1.Job) { + Entry("vs guaranteed pods with pinnable memory and CPU", func(job *batchv1.Job) { jobRequiredRes := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("256Mi"), @@ -289,9 +621,12 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche idleJob = makeIdleJob(fxt.Namespace.Name, expectedJobPodsPerNode, len(nrtCandidates)) ctx := context.TODO() + By("creating the noisy job") Expect(fxt.Client.Create(ctx, idleJob)).To(Succeed()) // will be removed by the fixture _, err := wait.With(fxt.Client).Interval(3*time.Second).Timeout(30*time.Second).ForJobCompleted(ctx, namespacedname.FromObject(idleJob)) Expect(err).ToNot(HaveOccurred()) + + By("ensuring the foreign pods are noticed") // ensure foreign pods are reported e2efixture.MustSettleNRT(fxt) @@ -303,8 +638,9 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche e2efixture.Skip(fxt, "Not enough nodes found with at least 2 NUMA zones") } - // CAUTION here: we assume all worker node identicals, so to estimate the available - // resources we pick one at random and we use it as reference + By("computing the test workload resources") + // CAUTION here: we assume all worker node are identical, so to estimate + // the available resources we pick one at random and we use it as reference nodesNames := e2enrt.AccumulateNames(nrts) referenceNodeName, ok := e2efixture.PopNodeName(nodesNames) Expect(ok).To(BeTrue()) @@ -328,10 +664,14 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche // triggering the pessimistic overallocation and making sure the scheduler will have to wait. // the actual ratio is not that important (could have been 11/10 possibly) as long as it triggers // this condition. + klog.Infof("total available CPU: %d nodes x %d NUMA zone per node x %d CPUs per node = %dx%dx%d = %d", + len(nrts), len(nrtInfo.Zones), resVal, len(nrts), len(nrtInfo.Zones), resVal, len(nrts)*len(nrtInfo.Zones)*int(resVal)) + cpusVal := (10 * resVal) / 8 numPods := int(int64(len(nrts)) * cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!) - klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusVal, resVal) + By("running the test workload pods") + klog.Infof("creating %d pods consuming %d cpus each total %d CPUs consumed (found %d per NUMA zone)", numPods, cpusPerPod, numPods*int(cpusPerPod), resVal) var testPods []*corev1.Pod for idx := 0; idx < numPods; idx++ { @@ -348,10 +688,12 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche err = fxt.Client.Create(ctx, testPod) Expect(err).ToNot(HaveOccurred()) + dumpPodResources(testPod) testPods = append(testPods, testPod) } - failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodListAllRunning(ctx, testPods) + By("checking the test workload status") + failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(ctx, testPods) for _, failedPod := range failedPods { _ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name) @@ -364,13 +706,13 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche Expect(schedOK).To(BeTrue(), "pod %s/%s not scheduled with expected scheduler %s", updatedPod.Namespace, updatedPod.Name, serialconfig.Config.SchedulerName) } }, - Entry("should handle a burst of qos=guaranteed pods", Label(label.Tier0), func(pod *corev1.Pod) { + Entry("should handle a burst of qos=guaranteed pods", func(pod *corev1.Pod) { pod.Spec.Containers[0].Resources.Limits = corev1.ResourceList{ corev1.ResourceCPU: *resource.NewQuantity(cpusPerPod, resource.DecimalSI), corev1.ResourceMemory: resource.MustParse("64Mi"), } }), - Entry("should handle a burst of qos=burstable pods", Label(label.Tier0), func(pod *corev1.Pod) { + Entry("should handle a burst of qos=burstable pods", func(pod *corev1.Pod) { pod.Spec.Containers[0].Resources.Requests = corev1.ResourceList{ corev1.ResourceCPU: *resource.NewQuantity(cpusPerPod, resource.DecimalSI), corev1.ResourceMemory: resource.MustParse("64Mi"), @@ -384,6 +726,19 @@ var _ = Describe("[serial][scheduler][cache] scheduler cache stall", Label("sche }) }) +func dumpPodResources(pod *corev1.Pod) { + for _, ctr := range pod.Spec.InitContainers { + klog.Infof("pod %s/%s init container %s on %s requiring requests=%s/limits=%s", + pod.Namespace, pod.Name, ctr.Name, pod.Spec.NodeName, + e2ereslist.ToString(ctr.Resources.Requests), e2ereslist.ToString(ctr.Resources.Limits)) + } + for _, ctr := range pod.Spec.Containers { + klog.Infof("pod %s/%s app container %s on %s requiring requests=%s/limits=%s", + pod.Namespace, pod.Name, ctr.Name, pod.Spec.NodeName, + e2ereslist.ToString(ctr.Resources.Requests), e2ereslist.ToString(ctr.Resources.Limits)) + } +} + func makeIdleJob(jobNamespace string, expectedJobPodsPerNode, numWorkerNodes int) *batchv1.Job { idleJobParallelism := int32(numWorkerNodes * expectedJobPodsPerNode) klog.Infof("Using job parallelism=%d (with %d candidate nodes)", idleJobParallelism, numWorkerNodes) diff --git a/test/e2e/serial/tests/tolerations.go b/test/e2e/serial/tests/tolerations.go index 4b11d61df..e294279ee 100644 --- a/test/e2e/serial/tests/tolerations.go +++ b/test/e2e/serial/tests/tolerations.go @@ -532,9 +532,11 @@ var _ = Describe("[serial][disruptive][rtetols] numaresources RTE tolerations su When("RTE pods are not running yet", func() { var taintedNode *corev1.Node - customPolicySupportEnabled := isCustomPolicySupportEnabled(&nroOperObj) + var customPolicySupportEnabled bool BeforeEach(func(ctx context.Context) { + customPolicySupportEnabled = isCustomPolicySupportEnabled(&nroOperObj) + By("Get list of worker nodes") var err error workers, err = nodes.GetWorkers(fxt.DEnv()) @@ -575,7 +577,7 @@ var _ = Describe("[serial][disruptive][rtetols] numaresources RTE tolerations su Expect(err).ToNot(HaveOccurred()) By("wait for all the RTE pods owned by the daemonset to be deleted") - err = wait.With(fxt.Client).Interval(30*time.Second).Timeout(2*time.Minute).ForPodListAllDeleted(ctx, rtePods) + err = wait.With(fxt.Client).Interval(30*time.Second).Timeout(2*time.Minute).ForPodsAllDeleted(ctx, podlist.ToPods(rtePods)) Expect(err).ToNot(HaveOccurred(), "Expected all RTE pods owned by the DaemonSet to be deleted within the timeout") tnts, _, err = taints.ParseTaints([]string{testTaint()}) diff --git a/test/internal/fixture/wait.go b/test/internal/fixture/wait.go index ad3799423..4a6f7e132 100644 --- a/test/internal/fixture/wait.go +++ b/test/internal/fixture/wait.go @@ -33,7 +33,7 @@ import ( // WaitPodsRunning waits for all padding pods to be up and running ( or fail) func WaitForPaddingPodsRunning(fxt *Fixture, paddingPods []*corev1.Pod) []string { var failedPodIds []string - failedPods, _ := wait.With(fxt.Client).ForPodListAllRunning(context.TODO(), paddingPods) + failedPods, _ := wait.With(fxt.Client).ForPodsAllRunning(context.TODO(), paddingPods) for _, failedPod := range failedPods { _ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name) //note that this test does not use podOverhead thus pod req and lim would be the pod's resources as set upon creating diff --git a/test/internal/nrosched/nrosched.go b/test/internal/nrosched/nrosched.go index adf9125a7..5de93a21d 100644 --- a/test/internal/nrosched/nrosched.go +++ b/test/internal/nrosched/nrosched.go @@ -59,7 +59,7 @@ const ( type eventChecker func(ev corev1.Event) bool func checkPODEvents(k8sCli *kubernetes.Clientset, podNamespace, podName string, evCheck eventChecker) (bool, error) { - By(fmt.Sprintf("checking events for pod %s/%s", podNamespace, podName)) + klog.Infof("checking events for pod %s/%s", podNamespace, podName) opts := metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(map[string]string{ "involvedObject.name": podName, diff --git a/test/internal/padder/padder.go b/test/internal/padder/padder.go index 6c9a6b9b6..325ee8391 100644 --- a/test/internal/padder/padder.go +++ b/test/internal/padder/padder.go @@ -205,7 +205,7 @@ func (p *Padder) Pad(timeout time.Duration, options PaddingOptions) error { } } - if failedPods, _ := wait.With(p.Client).ForPodListAllRunning(context.TODO(), pods); len(failedPods) > 0 { + if failedPods, _ := wait.With(p.Client).ForPodsAllRunning(context.TODO(), pods); len(failedPods) > 0 { var asStrings []string for _, pod := range failedPods { asStrings = append(asStrings, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))