Skip to content

Add Job Cancelling and More Unit Tests #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/codeflare_sdk/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def submit(self, cluster: "Cluster" = None) -> "Job":


class DDPJob(Job):
def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster"):
def __init__(self, job_definition: "DDPJobDefinition", cluster: "Cluster" = None):
self.job_definition = job_definition
self.cluster = cluster
if self.cluster:
Expand All @@ -169,3 +169,6 @@ def status(self) -> str:

def logs(self) -> str:
return "".join(torchx_runner.log_lines(self._app_handle, None))

def cancel(self):
torchx_runner.cancel(self._app_handle)
127 changes: 127 additions & 0 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from torchx.specs import AppDryRunInfo, AppDef
from torchx.runner import get_runner, Runner
from torchx.schedulers.ray_scheduler import RayJob
from torchx.schedulers.kubernetes_mcad_scheduler import KubernetesMCADJob
import pytest


Expand Down Expand Up @@ -1686,6 +1687,40 @@ def test_DDPJobDefinition_dry_run():
assert ddp_job._scheduler == "ray"


def test_DDPJobDefinition_dry_run_no_cluster():
"""
Test that the dry run method returns the correct type: AppDryRunInfo,
that the attributes of the returned object are of the correct type,
and that the values from cluster and job definition are correctly passed.
"""
ddp = test_DDPJobDefinition_creation()
ddp.image = "fake-image"
ddp_job = ddp._dry_run_no_cluster()
assert type(ddp_job) == AppDryRunInfo
assert ddp_job._fmt is not None
assert type(ddp_job.request) == KubernetesMCADJob
assert type(ddp_job._app) == AppDef
assert type(ddp_job._cfg) == type(dict())
assert type(ddp_job._scheduler) == type(str())

assert (
ddp_job.request.resource["spec"]["resources"]["GenericItems"][0][
"generictemplate"
]
.spec.containers[0]
.image
== "fake-image"
)

assert ddp_job._app.roles[0].resource.cpu == 1
assert ddp_job._app.roles[0].resource.gpu == 0
assert ddp_job._app.roles[0].resource.memMB == 1024

assert ddp_job._cfg["requirements"] == "test"

assert ddp_job._scheduler == "kubernetes_mcad"


def test_DDPJobDefinition_dry_run_no_resource_args():
"""
Test that the dry run correctly gets resources from the cluster object
Expand Down Expand Up @@ -1715,6 +1750,55 @@ def test_DDPJobDefinition_dry_run_no_resource_args():
)


def test_DDPJobDefinition_dry_run_no_cluster_no_resource_args():
"""
Test that the dry run method returns the correct type: AppDryRunInfo,
that the attributes of the returned object are of the correct type,
and that the values from cluster and job definition are correctly passed.
"""
ddp = test_DDPJobDefinition_creation()
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: image"
ddp.image = "fake-image"
ddp.name = None
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: name"
ddp.name = "fake"
ddp.cpu = None
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: cpu (# cpus per worker)"
ddp.cpu = 1
ddp.gpu = None
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: gpu (# gpus per worker)"
ddp.gpu = 1
ddp.memMB = None
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: memMB (memory in MB)"
ddp.memMB = 1
ddp.j = None
try:
ddp._dry_run_no_cluster()
assert 0 == 1
except ValueError as e:
assert str(e) == "Job definition missing arg: j (`workers`x`procs`)"


def test_DDPJobDefinition_submit(mocker):
"""
Tests that the submit method returns the correct type: DDPJob
Expand All @@ -1733,6 +1817,14 @@ def test_DDPJobDefinition_submit(mocker):
assert type(ddp_job._app_handle) == str
assert ddp_job._app_handle == "fake-dashboard-url"

ddp_def.image = "fake-image"
ddp_job = ddp_def.submit()
assert type(ddp_job) == DDPJob
assert type(ddp_job.job_definition) == DDPJobDefinition
assert ddp_job.cluster == None
assert type(ddp_job._app_handle) == str
assert ddp_job._app_handle == "fake-dashboard-url"


def test_DDPJob_creation(mocker):
ddp_def = test_DDPJobDefinition_creation()
Expand All @@ -1757,6 +1849,29 @@ def test_DDPJob_creation(mocker):
return ddp_job


def test_DDPJob_creation_no_cluster(mocker):
ddp_def = test_DDPJobDefinition_creation()
ddp_def.image = "fake-image"
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.schedule",
return_value="fake-app-handle",
) # a fake app_handle
ddp_job = DDPJob(ddp_def, None)
assert type(ddp_job) == DDPJob
assert type(ddp_job.job_definition) == DDPJobDefinition
assert ddp_job.cluster == None
assert type(ddp_job._app_handle) == str
assert ddp_job._app_handle == "fake-app-handle"
_, args, kwargs = torchx_runner.schedule.mock_calls[0]
assert type(args[0]) == AppDryRunInfo
job_info = args[0]
assert type(job_info.request) == KubernetesMCADJob
assert type(job_info._app) == AppDef
assert type(job_info._cfg) == type(dict())
assert type(job_info._scheduler) == type(str())
return ddp_job


def test_DDPJob_status(mocker):
ddp_job = test_DDPJob_creation(mocker)
mocker.patch(
Expand All @@ -1777,6 +1892,18 @@ def test_DDPJob_logs(mocker):
assert args[0] == "fake-dashboard-url"


def arg_check_side_effect(*args):
assert args[0] == "fake-app-handle"


def test_DDPJob_cancel(mocker):
ddp_job = test_DDPJob_creation_no_cluster(mocker)
mocker.patch(
"codeflare_sdk.job.jobs.torchx_runner.cancel", side_effect=arg_check_side_effect
)
ddp_job.cancel()


def parse_j(cmd):

pattern = r"--nnodes\s+\d+\s+--nproc_per_node\s+\d+"
Expand Down