1
- import base64
2
- import logging
3
- import os
4
- import sys
5
1
from kubernetes import client , config
6
- from kubernetes .client import V1Job , V1ObjectMeta , V1JobSpec , V1PodTemplateSpec , V1PodSpec , V1Container , V1VolumeMount , V1Volume , V1ConfigMapVolumeSource , V1EmptyDirVolumeSource , V1EnvVar , V1SecurityContext , V1SeccompProfile , V1Capabilities
7
- from kubernetes .client .rest import ApiException
8
- import time
2
+ import kubernetes .client
9
3
import subprocess
10
4
5
+ import sys
6
+ import os
7
+
8
+ from time import sleep
9
+
10
+ import ray
11
+
12
+ from torchx .specs .api import AppState , is_terminal
13
+
14
+ from codeflare_sdk .cluster .cluster import Cluster , ClusterConfiguration
15
+ from codeflare_sdk .job .jobs import DDPJobDefinition
16
+
11
17
import pytest
12
18
13
- from support import random_choice , read_file
19
+ from support import random_choice
20
+
21
+ # Creates a Ray cluster, and trains the MNIST dataset using the CodeFlare SDK.
22
+ # Asserts creation of AppWrapper, RayCluster, and successful completion of the training job.
23
+ # Covers successfull installation of CodeFlare-SDK
14
24
15
25
class TestMNISTRayClusterSDK :
16
26
def setup_method (self ):
@@ -19,104 +29,138 @@ def setup_method(self):
19
29
20
30
# Initialize Kubernetes client
21
31
self .api_instance = client .CoreV1Api ()
22
- self .batch_api = client .BatchV1Api ()
23
- self .cmap = client .V1ConfigMap ()
32
+ self .custom_api = kubernetes .client .CustomObjectsApi (self .api_instance .api_client )
24
33
25
34
def teardown_method (self ):
26
35
if hasattr (self , 'namespace' ):
27
36
self .api_instance .delete_namespace (self .namespace )
28
37
if hasattr (self , 'configmap' ):
29
38
self .api_instance .delete_namespaced_config_map (self .configmap .metadata .name , self .namespace )
30
39
31
-
32
40
def test_mnist_ray_cluster_sdk (self ):
33
- namespace = self .create_test_namespace ()
34
-
35
- file_paths = [
36
- "./tests/e2e/mnist_raycluster_sdk_test.py" ,
37
- "./tests/e2e/requirements.txt" ,
38
- "./tests/e2e/mnist.py" ,
39
- "./tests/e2e/install-codeflare-sdk.sh"
40
- ]
41
- self .create_config_map (namespace , file_paths )
42
-
41
+ self .create_test_namespace ()
43
42
self .run_mnist_raycluster_sdk ()
44
43
45
-
46
44
def create_test_namespace (self ):
47
45
self .namespace = f"test-ns-{ random_choice ()} "
48
46
namespace_body = client .V1Namespace (metadata = client .V1ObjectMeta (name = self .namespace ))
49
47
self .api_instance .create_namespace (namespace_body )
50
48
return self .namespace
51
49
52
- def create_config_map (self , namespace , file_paths ):
53
- data = {os .path .basename (path ): read_file (path ) for path in file_paths }
54
- binary_data = {key : base64 .b64encode (value ).decode ('utf-8' ) for key , value in data .items ()}
55
- config_map = client .V1ConfigMap (
56
- api_version = "v1" ,
57
- kind = "ConfigMap" ,
58
- metadata = client .V1ObjectMeta (
59
- generate_name = "config-" ,
60
- namespace = namespace ,
61
- ),
62
- binary_data = binary_data ,
63
- immutable = True ,
50
+ def run_mnist_raycluster_sdk (self ):
51
+ ray_image = "quay.io/project-codeflare/ray:latest-py39-cu118"
52
+ host = os .getenv ("CLUSTER_HOSTNAME" )
53
+
54
+ ingress_options = {}
55
+ if host is not None :
56
+ ingress_options = {
57
+ "ingresses" : [
58
+ {
59
+ "ingressName" : "ray-dashboard" ,
60
+ "port" : 8265 ,
61
+ "pathType" : "Prefix" ,
62
+ "path" : "/" ,
63
+ "host" : host ,
64
+ "annotations" : {
65
+ "nginx.ingress.kubernetes.io/proxy-body-size" : "10M" ,
66
+ }
67
+ },
68
+ ]
69
+ }
70
+
71
+ cluster = Cluster (
72
+ ClusterConfiguration (
73
+ name = "mnist" ,
74
+ namespace = self .namespace ,
75
+ num_workers = 1 ,
76
+ head_cpus = "500m" ,
77
+ head_memory = 2 ,
78
+ min_cpus = "500m" ,
79
+ max_cpus = 1 ,
80
+ min_memory = 1 ,
81
+ max_memory = 2 ,
82
+ num_gpus = 0 ,
83
+ instascale = False ,
84
+ image = ray_image ,
85
+ ingress_options = ingress_options ,
86
+ )
64
87
)
65
- # config_map = client.V1ConfigMap(data=data)
66
- self .api_instance .create_namespaced_config_map (namespace = namespace , body = config_map )
67
88
68
- def run_mnist_raycluster_sdk (self ):
69
- script_path = './tests/e2e/mnist_raycluster_sdk.py'
70
- result = subprocess .run (['python' , script_path , self .namespace ])
71
- output = result .stdout
72
- errors = result .stderr
73
- if result .returncode != 0 :
74
- raise subprocess .CalledProcessError (result .returncode , 'python' , output = output , stderr = errors )
75
- return output
76
-
77
- # # Specifically used on KinD clusters
78
- # def configure_pods(self):
79
- # hostname = os.getenv('CLUSTER_HOSTNAME')
80
- # node = self.get_first_node()
81
- # node_ip = self.get_node_internal_ip(node)
82
- # host_alias = client.V1HostAlias(ip=node_ip, hostnames=[hostname])
83
-
84
- # pods = self.find_mnist_head_pod(self.namespace)
85
- # for pod in pods:
86
- # container = pod.spec.containers[0]
87
- # if not pod.spec.host_aliases:
88
- # pod.spec.host_aliases = []
89
- # pod.spec.host_aliases.append(host_alias)
90
- # if not container.env:
91
- # container.env = []
92
- # container.env.append(hostname)
93
-
94
-
95
-
96
- # def get_node_internal_ip(node):
97
- # for address in node.status.addresses:
98
- # if address.type == "InternalIP":
99
- # ip = address.address
100
- # return ip
101
-
102
- # def get_first_node(self):
103
- # try:
104
- # # List all nodes in the cluster
105
- # nodes = self.api_instance.list_node()
106
- # except ApiException as e:
107
- # pytest.fail(f"Exception when calling CoreV1Api->list_node: {e}")
108
- # return nodes.items[0]
109
-
110
- # def find_mnist_head_pod(self, namespace):
111
- # try:
112
- # # List all pods in the specified namespace
113
- # pods = self.v1.list_namespaced_pod(namespace)
114
- # except ApiException as e:
115
- # print(f"Exception when calling CoreV1Api->list_namespaced_pod: {e}")
116
- # return None
117
-
118
- # for pod in pods.items:
119
- # if pod.metadata.name.startswith("mnist-head"):
120
- # return pod
121
- # print("No 'mnist-head' pod found in the namespace")
122
- # return None
89
+
90
+ cluster .up ()
91
+ self .assert_appwrapper_exists ()
92
+
93
+ cluster .status ()
94
+
95
+ cluster .wait_ready ()
96
+ self .assert_raycluster_exists ()
97
+
98
+ cluster .status ()
99
+
100
+ cluster .details ()
101
+
102
+ jobdef = DDPJobDefinition (
103
+ name = "mnist" ,
104
+ script = "./tests/e2e/mnist.py" ,
105
+ scheduler_args = {"requirements" : "./tests/e2e/mnist_pip_requirements.txt" },
106
+ )
107
+ job = jobdef .submit (cluster )
108
+
109
+ done = False
110
+ time = 0
111
+ timeout = 900
112
+ while not done :
113
+ status = job .status ()
114
+ if is_terminal (status .state ):
115
+ break
116
+ if not done :
117
+ print (status )
118
+ if timeout and time >= timeout :
119
+ raise TimeoutError (f"job has timed out after waiting { timeout } s" )
120
+ sleep (5 )
121
+ time += 5
122
+
123
+ print (job .status ())
124
+ self .assert_job_completion (status )
125
+
126
+ print (job .logs ())
127
+
128
+ cluster .down ()
129
+
130
+
131
+ # if not status.state == AppState.SUCCEEDED:
132
+
133
+ # script_path = './tests/e2e/mnist_raycluster_sdk.py'
134
+ # result = subprocess.run(['python', script_path, self.namespace])
135
+ # output = result.stdout
136
+ # errors = result.stderr
137
+ # if result.returncode != 0:
138
+ # raise subprocess.CalledProcessError(result.returncode, 'python', output=output, stderr=errors)
139
+ # return output
140
+
141
+
142
+ def assert_appwrapper_exists (self ):
143
+ try :
144
+ self .custom_api .get_namespaced_custom_object ("workload.codeflare.dev" , "v1beta1" , self .namespace , "appwrappers" , "mnist" )
145
+ print (f"AppWrapper 'mnist' has been created in the namespace: '{ self .namespace } '" )
146
+ assert True
147
+ except Exception as e :
148
+ print (f"AppWrapper 'mnist' has not been created. Error: { e } " )
149
+ assert False
150
+
151
+ def assert_raycluster_exists (self ):
152
+ try :
153
+ self .custom_api .get_namespaced_custom_object ("ray.io" , "v1" , self .namespace , "rayclusters" , "mnist" )
154
+ print (f"RayCluster 'mnist' created successfully in the namespace: '{ self .namespace } '" )
155
+ assert True
156
+ except Exception as e :
157
+ print (f"RayCluster 'mnist' has not been created. Error: { e } " )
158
+ assert False
159
+
160
+ def assert_job_completion (self , status ):
161
+ if status .state == AppState .SUCCEEDED :
162
+ print (f"Job has completed: '{ status .state } '" )
163
+ assert True
164
+ else :
165
+ print (f"Job has completed: '{ status .state } '" )
166
+ assert False
0 commit comments