Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions internal/podlist/conv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package podlist

import corev1 "k8s.io/api/core/v1"

// ToPods bridges packages podlist and wait.
// podlist package reasons in terms of corev1.PodList{}.Items: []corev1.Pod
// wait package reasons in terms of []*corev1.Pod (note the pointer)
func ToPods(pods []corev1.Pod) []*corev1.Pod {
ret := make([]*corev1.Pod, 0, len(pods))
for idx := range pods {
ret = append(ret, &pods[idx])
}
return ret
}
14 changes: 6 additions & 8 deletions internal/wait/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (wt Waiter) ForPodDeleted(ctx context.Context, podNamespace, podName string
})
}

func (wt Waiter) ForPodListAllRunning(ctx context.Context, pods []*corev1.Pod) ([]*corev1.Pod, []*corev1.Pod) {
func (wt Waiter) ForPodsAllRunning(ctx context.Context, pods []*corev1.Pod) ([]*corev1.Pod, []*corev1.Pod) {
var lock sync.Mutex
var failed []*corev1.Pod
var updated []*corev1.Pod
Expand Down Expand Up @@ -112,21 +112,19 @@ func (wt Waiter) ForPodListAllRunning(ctx context.Context, pods []*corev1.Pod) (
return failed, updated
}

func (wt Waiter) ForPodListAllDeleted(ctx context.Context, pods []corev1.Pod) error {
func (wt Waiter) ForPodsAllDeleted(ctx context.Context, pods []*corev1.Pod) error {
if len(pods) == 0 {
klog.Infof("ForPodListAllDeleted called with an empty list of pods. Nothing to do.")
klog.Infof("ForPodsAllDeleted called with an empty slice of pods. Nothing to do.")
return nil
}

klog.Infof("Waiting for %d pod(s) to be deleted.", len(pods))

var eg errgroup.Group
for idx := range pods {
pod := pods[idx]
podKey := client.ObjectKeyFromObject(&pod).String()

for _, pod := range pods {
podKey := client.ObjectKeyFromObject(pod).String()
eg.Go(func() error {
klog.Infof("Goroutine started: waiting for pod %s to be deleted.", podKey)
klog.Infof("Waiting for pod %s to be deleted.", podKey)
err := wt.ForPodDeleted(ctx, pod.Namespace, pod.Name)
if err != nil {
klog.Warningf("Failed to confirm deletion for pod %s: %v", podKey, err)
Expand Down
16 changes: 11 additions & 5 deletions test/e2e/serial/tests/non_regression_fundamentals.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser
})

Context("using the NUMA-aware scheduler with NRT data", func() {
var cpusPerPod int64 = 2 // must be even. Must be >= 2
var cpusPerPod int64

BeforeEach(func() {
cpusPerPod = 2 // must be even. Must be >= 2
})

DescribeTable("[node1] against a single node", Label("node1"),
// the ourpose of this test is to send a burst of pods towards a node. Each pod must require resources in such a way
Expand Down Expand Up @@ -116,7 +120,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser
// CAUTION: still assuming all NUMA zones are equal across all nodes
numPods := int(cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!)

klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusVal, maxAllocPerNUMAVal)
klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusPerPod, maxAllocPerNUMAVal)

var testPods []*corev1.Pod
for idx := 0; idx < numPods; idx++ {
Expand All @@ -135,7 +139,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser
testPods = append(testPods, testPod)
}

failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(context.TODO(), testPods)

for _, failedPod := range failedPods {
_ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name)
Expand Down Expand Up @@ -207,11 +211,13 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser
resQty := e2enrt.GetMaxAllocatableResourceNumaLevel(*nrtInfo, corev1.ResourceCPU)
resVal, ok := resQty.AsInt64()
Expect(ok).To(BeTrue(), "cannot convert allocatable CPU resource as int")
klog.Infof("total available CPU: %d nodes x %d NUMA zone per node x %d CPUs per node = %dx%dx%d = %d",
len(nrts), len(nrtInfo.Zones), resVal, len(nrts), len(nrtInfo.Zones), resVal, len(nrts)*len(nrtInfo.Zones)*int(resVal))

cpusVal := (10 * resVal) / 8
numPods := int(int64(len(nrts)) * cpusVal / cpusPerPod) // unlikely we will need more than a billion pods (!!)

klog.Infof("creating %d pods consuming %d cpus each (found %d per NUMA zone)", numPods, cpusVal, resVal)
klog.Infof("creating %d pods consuming %d cpus each total %d CPUs consumed (found %d per NUMA zone)", numPods, cpusPerPod, numPods*int(cpusPerPod), resVal)

var testPods []*corev1.Pod
for idx := 0; idx < numPods; idx++ {
Expand All @@ -231,7 +237,7 @@ var _ = Describe("numaresources fundamentals non-regression", Serial, Label("ser
testPods = append(testPods, testPod)
}

failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Timeout(timeout).ForPodsAllRunning(context.TODO(), testPods)

for _, failedPod := range failedPods {
_ = objects.LogEventsForPod(fxt.K8sClient, failedPod.Namespace, failedPod.Name)
Expand Down
10 changes: 5 additions & 5 deletions test/e2e/serial/tests/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac

// very generous timeout here. It's hard and racy to check we had 2 pods pending (expected phased scheduling),
// but that would be the most correct and stricter testing.
failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Timeout(3*time.Minute).ForPodsAllRunning(context.TODO(), testPods)
dumpFailedPodInfo(fxt, failedPods)
Expect(failedPods).To(BeEmpty(), "unexpected failed pods: %q", accumulatePodNamespacedNames(failedPods))

Expand Down Expand Up @@ -295,7 +295,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac
By("waiting for the test pods to go running")
// even more generous timeout here. We need to tolerate more reconciliation time because of the interference
startTime := time.Now()
failedPods, updatedPods := wait.With(fxt.Client).Interval(5*time.Second).Timeout(5*time.Minute).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Interval(5*time.Second).Timeout(5*time.Minute).ForPodsAllRunning(context.TODO(), testPods)
dumpFailedPodInfo(fxt, failedPods)
elapsed := time.Since(startTime)
klog.Infof("test pods (payload + interference) gone running in %v", elapsed)
Expand Down Expand Up @@ -429,7 +429,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac
// this is a slight abuse. We want to wait for hostsRequired < desiredPods to be running. Other pod(s) must be pending.
// So we wait a bit too much unnecessarily, but wetake this chance to ensure the pod(s) which are supposed to be pending
// stay pending at least up until timeout
failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodsAllRunning(context.TODO(), testPods)
Expect(updatedPods).To(HaveLen(hostsRequired))
Expect(failedPods).To(HaveLen(expectedPending))
Expect(len(updatedPods) + len(failedPods)).To(Equal(desiredPods))
Expand Down Expand Up @@ -549,7 +549,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac
// this is a slight abuse. We want to wait for hostsRequired < desiredPods to be running. Other pod(s) must be pending.
// So we wait a bit too much unnecessarily, but wetake this chance to ensure the pod(s) which are supposed to be pending
// stay pending at least up until timeout
failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodListAllRunning(context.TODO(), testPods)
failedPods, updatedPods := wait.With(fxt.Client).Timeout(time.Minute).ForPodsAllRunning(context.TODO(), testPods)
Expect(updatedPods).To(HaveLen(hostsRequired))
Expect(failedPods).To(HaveLen(expectedPending))

Expand Down Expand Up @@ -596,7 +596,7 @@ var _ = Describe("scheduler cache", Serial, Label(label.Tier0, "scheduler", "cac
// here we really need a quite long timeout. Still 300s is a bit of overshot (expected so).
// The reason to be supercareful here is the potentially long interplay between
// NRT updater, resync loop, scheduler retry loop.
failedPods, updatedPods = wait.With(fxt.Client).Timeout(5*time.Minute).ForPodListAllRunning(context.TODO(), expectedRunningPods)
failedPods, updatedPods = wait.With(fxt.Client).Timeout(5*time.Minute).ForPodsAllRunning(context.TODO(), expectedRunningPods)
dumpFailedPodInfo(fxt, failedPods)
Expect(updatedPods).To(HaveLen(hostsRequired))
Expect(failedPods).To(BeEmpty())
Expand Down
Loading
Loading