Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/scheduler/plugins/deviceshare/device_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,18 @@ func (n *nodeDevice) tryAllocateDeviceByType(podRequest corev1.ResourceList, dev

// freeDevices is the rest of the whole machine, or is the rest of the reservation
freeDevices := n.deviceFree[deviceType]
deviceUsed := n.deviceUsed[deviceType]
// preemptible represent preemptible devices, which may be a complete device instance or part of an instance's resources
preemptible := preemptibleDevices[deviceType]
var mergedFreeDevices deviceResources
if len(preemptible) > 0 {
mergedFreeDevices = make(deviceResources)
for minor, v := range preemptible {
mergedFreeDevices[minor] = v.DeepCopy()
used := quotav1.SubtractWithNonNegativeResult(deviceUsed[minor], v)
remaining := quotav1.SubtractWithNonNegativeResult(nodeDeviceTotal[minor], used)
if !quotav1.IsZero(remaining) {
mergedFreeDevices[minor] = remaining
}
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/scheduler/plugins/deviceshare/device_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package deviceshare

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -282,6 +283,82 @@ func Test_nodeDevice_allocateGPU(t *testing.T) {
assert.True(t, equality.Semantic.DeepEqual(expectAllocations, allocateResult))
}

func Test_nodeDevice_failedPreemptGPUFromReservation(t *testing.T) {
nd := newNodeDevice()
nd.resetDeviceTotal(map[schedulingv1alpha1.DeviceType]deviceResources{
schedulingv1alpha1.GPU: {
1: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
apiext.ResourceGPUMemory: resource.MustParse("8Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("100"),
},
2: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
apiext.ResourceGPUMemory: resource.MustParse("8Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("100"),
},
},
schedulingv1alpha1.RDMA: {
1: corev1.ResourceList{
apiext.ResourceRDMA: resource.MustParse("100"),
},
2: corev1.ResourceList{
apiext.ResourceRDMA: resource.MustParse("100"),
},
},
})

allocations := apiext.DeviceAllocations{
schedulingv1alpha1.GPU: {
{
Minor: 1,
Resources: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("200"),
apiext.ResourceGPUMemory: resource.MustParse("16Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("200"),
},
},
{
Minor: 2,
Resources: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("200"),
apiext.ResourceGPUMemory: resource.MustParse("16Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("200"),
},
},
},
}
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-pod-1",
},
}
nd.updateCacheUsed(allocations, pod, true)

podRequests := corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("50"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("50"),
}
preemptible := map[schedulingv1alpha1.DeviceType]deviceResources{
schedulingv1alpha1.GPU: {
1: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
apiext.ResourceGPUMemory: resource.MustParse("8Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("100"),
},
2: corev1.ResourceList{
apiext.ResourceGPUCore: resource.MustParse("100"),
apiext.ResourceGPUMemory: resource.MustParse("8Gi"),
apiext.ResourceGPUMemoryRatio: resource.MustParse("100"),
},
},
}
allocateResult, err := nd.tryAllocateDevice(podRequests, preemptible)
assert.EqualError(t, err, fmt.Sprintf("node does not have enough %v", schedulingv1alpha1.GPU))
assert.Nil(t, allocateResult)
}

func Test_nodeDevice_allocateGPUWithUnhealthyInstance(t *testing.T) {
nd := newNodeDevice()
nd.resetDeviceTotal(map[schedulingv1alpha1.DeviceType]deviceResources{
Expand Down
37 changes: 37 additions & 0 deletions test/e2e/scheduling/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ package scheduling

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
imageutils "k8s.io/kubernetes/test/utils/image"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation"
"github.com/koordinator-sh/koordinator/test/e2e/framework"
e2epod "github.com/koordinator-sh/koordinator/test/e2e/framework/pod"
e2ereplicaset "github.com/koordinator-sh/koordinator/test/e2e/framework/replicaset"
Expand Down Expand Up @@ -204,3 +211,33 @@ func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
framework.ExpectNoError(e2ereplicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
return rs
}

func waitingForReservationScheduled(koordinatorClientSet koordclientset.Interface, reservation *schedulingv1alpha1.Reservation) string {
var r *schedulingv1alpha1.Reservation
gomega.Eventually(func() bool {
var err error
r, err = koordinatorClientSet.SchedulingV1alpha1().Reservations().Get(context.TODO(), reservation.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
return reservationutil.IsReservationAvailable(r)
}, 60*time.Second, 1*time.Second).Should(gomega.Equal(true))
if r == nil {
return ""
}
return r.Status.NodeName
}

func expectPodBoundReservation(clientSet clientset.Interface, koordinatorClientSet koordclientset.Interface, podNamespace, podName, reservationName string) {
pod, err := clientSet.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
reservation, err := koordinatorClientSet.SchedulingV1alpha1().Reservations().Get(context.TODO(), reservationName, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(pod.Spec.NodeName).Should(gomega.Equal(reservation.Status.NodeName),
fmt.Sprintf("reservation is scheduled to node %v but pod is scheduled to node %v", reservation.Status.NodeName, pod.Spec.NodeName))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
framework.ExpectNoError(err)
gomega.Expect(reservationAllocated).Should(gomega.Equal(&apiext.ReservationAllocated{
Name: reservation.Name,
UID: reservation.UID,
}), "pod is not using the expected reservation")
}
Loading