Skip to content

Commit 486aefe

Browse files
authored
remove looping from syncqueue job (#593)
* remove looping from syncqueue job * remove looping for all resources * turn-off cluster scoped tests * address review
1 parent 3075ad5 commit 486aefe

File tree

2 files changed

+85
-73
lines changed

2 files changed

+85
-73
lines changed

pkg/controller/queuejobresources/genericresource/genericresource.go

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"encoding/json"
2121
"fmt"
2222
"math"
23-
"reflect"
2423
"runtime/debug"
2524
"strings"
2625
"time"
@@ -190,6 +189,9 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG
190189
return name, gvk, err
191190
}
192191

192+
//SyncQueueJob uses dynamic clients to unwrap (spawn) items inside genericItems block, it is used to create resources inside etcd and return errors when
193+
//unwrapping fails.
194+
//More context here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598
193195
func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (podList []*v1.Pod, err error) {
194196
startTime := time.Now()
195197
defer func() {
@@ -234,27 +236,32 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
234236
return []*v1.Pod{}, err
235237
}
236238

237-
_, apiresourcelist, err := dd.ServerGroupsAndResources()
238-
if err != nil {
239-
if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok {
240-
klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err)
241-
} else {
242-
klog.Errorf("Error getting supported groups and resources, err=%#v", err)
243-
return []*v1.Pod{}, err
244-
}
245-
}
239+
//TODO: Simplified apiresourcelist discovery, the assumption is we will always deploy namespaced objects
240+
//We dont intend to install CRDs like KubeRay, Spark-Operator etc through MCAD, I think such objects are typically
241+
//cluster scoped. May be for Multi-Cluster or inference use case we need such deep discovery, so for now commenting code.
242+
243+
// _, apiresourcelist, err := dd.ServerGroupsAndResources()
244+
// if err != nil {
245+
// if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok {
246+
// klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err)
247+
// } else {
248+
// klog.Errorf("Error getting supported groups and resources, err=%#v", err)
249+
// return []*v1.Pod{}, err
250+
// }
251+
// }
246252

247253
rsrc := mapping.Resource
248-
for _, apiresourcegroup := range apiresourcelist {
249-
if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
250-
for _, apiresource := range apiresourcegroup.APIResources {
251-
if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
252-
rsrc = mapping.Resource
253-
namespaced = apiresource.Namespaced
254-
}
255-
}
256-
}
257-
}
254+
255+
// for _, apiresourcegroup := range apiresourcelist {
256+
// if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) {
257+
// for _, apiresource := range apiresourcegroup.APIResources {
258+
// if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind {
259+
// rsrc = mapping.Resource
260+
// namespaced = apiresource.Namespaced
261+
// }
262+
// }
263+
// }
264+
// }
258265
var unstruct unstructured.Unstructured
259266
unstruct.Object = make(map[string]interface{})
260267
var blob interface{}
@@ -307,6 +314,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
307314
newName = newName[:63]
308315
}
309316
unstruct.SetName(newName)
317+
//Asumption object is always namespaced
318+
//Refer to comment on line 238
319+
namespaced = true
310320
err = createObject(namespaced, namespace, newName, rsrc, unstruct, dclient)
311321
if err != nil {
312322
if errors.IsAlreadyExists(err) {
@@ -319,29 +329,30 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra
319329
}
320330

321331
// Get the related resources of created object
322-
var thisObj *unstructured.Unstructured
323-
var err1 error
324-
if namespaced {
325-
thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
326-
} else {
327-
thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{})
328-
}
329-
if err1 != nil {
330-
klog.Errorf("Could not get created resource with error %v", err1)
331-
return []*v1.Pod{}, err1
332-
}
333-
thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind())
334-
335-
podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
336-
pods := []*v1.Pod{}
337-
for _, pod := range (*podL).Items {
338-
parent := metav1.GetControllerOf(&pod)
339-
if reflect.DeepEqual(thisOwnerRef, parent) {
340-
pods = append(pods, &pod)
341-
}
342-
klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name)
343-
}
344-
return pods, nil
332+
// var thisObj *unstructured.Unstructured
333+
//var err1 error
334+
// if namespaced {
335+
// thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
336+
// } else {
337+
// thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{})
338+
// }
339+
// if err1 != nil {
340+
// klog.Errorf("Could not get created resource with error %v", err1)
341+
// return []*v1.Pod{}, err1
342+
// }
343+
// thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind())
344+
345+
// podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
346+
// pods := []*v1.Pod{}
347+
// for _, pod := range (*podL).Items {
348+
// parent := metav1.GetControllerOf(&pod)
349+
// if reflect.DeepEqual(thisOwnerRef, parent) {
350+
// pods = append(pods, &pod)
351+
// }
352+
// klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name)
353+
// }
354+
// return pods, nil
355+
return []*v1.Pod{}, nil
345356
}
346357

347358
// checks if object has pod template spec and add new labels

test/e2e/queue.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -373,36 +373,37 @@ var _ = Describe("AppWrapper E2E Test", func() {
373373
// This test is flawed, the namespace created by this appwrapper is not cleaned up.
374374
// FIXME https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/471
375375
// Leaving it here so that the builds no longer fail
376-
It("Create AppWrapper - Namespace Only - 0 Pods", func() {
377-
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Started.\n")
378-
context := initTestContext()
379-
var appwrappers []*arbv1.AppWrapper
380-
appwrappersPtr := &appwrappers
381-
defer cleanupTestObjectsPtr(context, appwrappersPtr)
382-
383-
aw := createNamespaceAW(context, "aw-namespace-0")
384-
appwrappers = append(appwrappers, aw)
385-
fmt.Fprintf(GinkgoWriter, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - app wrappers len %d.\n", len(appwrappers))
386-
387-
err := waitAWNonComputeResourceActive(context, aw)
388-
Expect(err).NotTo(HaveOccurred())
389-
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Completed. Awaiting app wrapper cleanup\n")
390-
})
391-
392-
It("Create AppWrapper - Generic Namespace Only - 0 Pods", func() {
393-
fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Generic Namespace Only - 0 Pods - Started.\n")
394-
context := initTestContext()
395-
var appwrappers []*arbv1.AppWrapper
396-
appwrappersPtr := &appwrappers
397-
defer cleanupTestObjectsPtr(context, appwrappersPtr)
398-
399-
aw := createGenericNamespaceAW(context, "aw-generic-namespace-0")
400-
appwrappers = append(appwrappers, aw)
401-
402-
err := waitAWNonComputeResourceActive(context, aw)
403-
Expect(err).NotTo(HaveOccurred())
404-
405-
})
376+
//TODO: Below two tests are turned off, please refer to github issue here: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/598
377+
// It("Create AppWrapper - Namespace Only - 0 Pods", func() {
378+
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Started.\n")
379+
// context := initTestContext()
380+
// var appwrappers []*arbv1.AppWrapper
381+
// appwrappersPtr := &appwrappers
382+
// defer cleanupTestObjectsPtr(context, appwrappersPtr)
383+
384+
// aw := createNamespaceAW(context, "aw-namespace-0")
385+
// appwrappers = append(appwrappers, aw)
386+
// fmt.Fprintf(GinkgoWriter, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - app wrappers len %d.\n", len(appwrappers))
387+
388+
// err := waitAWNonComputeResourceActive(context, aw)
389+
// Expect(err).NotTo(HaveOccurred())
390+
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Namespace Only - 0 Pods - Completed. Awaiting app wrapper cleanup\n")
391+
// })
392+
393+
// It("Create AppWrapper - Generic Namespace Only - 0 Pods", func() {
394+
// fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - Generic Namespace Only - 0 Pods - Started.\n")
395+
// context := initTestContext()
396+
// var appwrappers []*arbv1.AppWrapper
397+
// appwrappersPtr := &appwrappers
398+
// defer cleanupTestObjectsPtr(context, appwrappersPtr)
399+
400+
// aw := createGenericNamespaceAW(context, "aw-generic-namespace-0")
401+
// appwrappers = append(appwrappers, aw)
402+
403+
// err := waitAWNonComputeResourceActive(context, aw)
404+
// Expect(err).NotTo(HaveOccurred())
405+
406+
// })
406407

407408
It("MCAD Custom Pod Resources Test", func() {
408409
fmt.Fprintf(os.Stdout, "[e2e] MCAD Custom Pod Resources Test - Started.\n")

0 commit comments

Comments
 (0)