Skip to content

Commit c095d38

Browse files
committed
Add a local and ssh scheduler definitions
1 parent ab9db51 commit c095d38

File tree

6 files changed

+122
-41
lines changed

6 files changed

+122
-41
lines changed

bioluigi/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ class bioluigi(luigi.Config):
88
scheduler_extra_args = luigi.ListParameter(default=[],
99
description='List of extra arguments to pass to the scheduler')
1010
slurm_srun_bin = luigi.Parameter(default='srun')
11+
ssh_bin = luigi.Parameter(default='ssh')
12+
ssh_remote = luigi.OptionalParameter(default=None)
13+
ssh_port = luigi.OptionalIntParameter(default=None)
14+
ssh_user = luigi.OptionalParameter(default=None)
15+
ssh_identity_file = luigi.OptionalParameter(default=None)
1116
prefetch_bin = luigi.Parameter(default='prefetch')
1217
fastqdump_bin = luigi.Parameter(default='fastq-dump')
1318
cutadapt_bin = luigi.Parameter(default='cutadapt')

bioluigi/scheduled_external_program.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,30 @@
22
This module provides a flavour of :class:`luigi.contrib.external_program.ExternalProgramTask`
33
designed to work with an external scheduler such as Slurm.
44
5-
It also extend the basic definition of an external task to declare basic
5+
It also extends the basic definition of an external task to declare basic
66
resource consumptions such as the number of CPU and the amount of memory
77
required to execute the task.
88
"""
99

1010
import datetime
11+
import os
1112
from datetime import timedelta
1213
from typing import Optional
1314

1415
import luigi
15-
from luigi.contrib.external_program import ExternalProgramTask
1616

1717
from .config import bioluigi
1818
from .schedulers import get_available_schedulers, get_scheduler, ScheduledTask
1919

2020
cfg = bioluigi()
2121

22-
class ScheduledExternalProgramTask(ExternalProgramTask, ScheduledTask):
22+
class ScheduledExternalProgramTask(luigi.Task, ScheduledTask):
2323
"""
2424
Variant of :class:`luigi.contrib.external_program.ExternalProgramTask` that
2525
executes the task with a :class:`Scheduler`.
2626
"""
2727
scheduler: str = luigi.ChoiceParameter(default=cfg.scheduler,
28-
choices=['local'] + [blurb for blurb in get_available_schedulers()],
28+
choices=[blurb for blurb in get_available_schedulers()],
2929
positional=False, significant=False,
3030
description='Scheduler to use for running the task')
3131
scheduler_partition: Optional[str] = luigi.OptionalParameter(default=cfg.scheduler_partition, positional=False,
@@ -42,21 +42,17 @@ class ScheduledExternalProgramTask(ExternalProgramTask, ScheduledTask):
4242
memory: float = luigi.FloatParameter(default=1, positional=False, significant=False,
4343
description='Amount of memory (in gigabyte) to allocate for the task')
4444

45+
working_directory = luigi.OptionalParameter(default=None, significant=False, positional=False)
46+
47+
capture_output = luigi.BoolParameter(default=True, significant=False, positional=False)
48+
49+
def program_environment(self) -> dict[str, str]:
50+
env = os.environ.copy()
51+
return env
52+
4553
@property
4654
def resources(self):
47-
if self.scheduler == 'local':
48-
# local_jobs is actually constrained by the number of workers
49-
return {'cpus': self.cpus, 'memory': self.memory}
50-
else:
51-
return {'{}_jobs'.format(self.scheduler): 1, '{}_cpus'.format(self.scheduler): self.cpus}
55+
return get_scheduler(self.scheduler).get_resources_for_task(self)
5256

5357
def run(self):
54-
if self.scheduler == 'local':
55-
super().run()
56-
else:
57-
try:
58-
_scheduler = get_scheduler(self.scheduler)
59-
except KeyError:
60-
raise ValueError('Unsupported scheduler {}'.format(self.scheduler))
61-
_scheduler.run_task(self)
62-
58+
get_scheduler(self.scheduler).run_task(self)

bioluigi/schedulers.py

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import json
22
import logging
3+
import subprocess
34
from abc import abstractmethod, ABC
45
from datetime import timedelta
5-
from subprocess import Popen, PIPE
6+
from subprocess import Popen
67
from typing import Optional
78

8-
import luigi
99
from luigi.contrib.external_program import ExternalProgramRunError, ExternalProgramRunContext
1010

1111
from .config import bioluigi
@@ -16,7 +16,7 @@
1616

1717
_schedulers = {}
1818

19-
class ScheduledTask(luigi.Task, ABC):
19+
class ScheduledTask(ABC):
2020
"""Interface for Luigi tasks that can be scheduled."""
2121
walltime: timedelta
2222

@@ -28,6 +28,8 @@ class ScheduledTask(luigi.Task, ABC):
2828

2929
capture_output: bool
3030

31+
working_directory: Optional[str]
32+
3133
@abstractmethod
3234
def program_environment(self) -> dict[str, str]:
3335
pass
@@ -40,7 +42,10 @@ def get_available_schedulers():
4042
return _schedulers.keys()
4143

4244
def get_scheduler(blurb):
43-
return _schedulers[blurb]
45+
try:
46+
return _schedulers[blurb]
47+
except KeyError:
48+
raise ValueError('Unsupported scheduler {}'.format(self.scheduler))
4449

4550
def register_scheduler(blurb):
4651
"""
@@ -56,9 +61,34 @@ def wrapper(cls):
5661

5762
return wrapper
5863

64+
def _run_command(args, env, cwd, capture_output):
65+
kwargs = {'env': env, 'universal_newlines': True}
66+
if capture_output:
67+
kwargs['stdout'] = subprocess.PIPE
68+
kwargs['stderr'] = subprocess.PIPE
69+
if cwd:
70+
kwargs['cwd'] = cwd
71+
logger.info('Running command %s%s', ' '.join(args), 'in ' + cwd if cwd else '')
72+
proc = Popen(args, **kwargs)
73+
with ExternalProgramRunContext(proc):
74+
if capture_output:
75+
stderr, stdout = proc.communicate()
76+
if stdout:
77+
logger.info('Program stdout:\n{}'.format(stdout))
78+
if stderr:
79+
logger.info('Program stderr:\n{}'.format(stderr))
80+
else:
81+
proc.wait()
82+
if proc.returncode != 0:
83+
raise ExternalProgramRunError('Program exited with non-zero return code.', tuple(args), env, stdout, stderr)
84+
5985
class Scheduler(ABC):
6086
@abstractmethod
61-
def run_task(self, task):
87+
def get_resources_for_task(self, task: ScheduledTask) -> dict[str, int]:
88+
pass
89+
90+
@abstractmethod
91+
def run_task(self, task: ScheduledTask):
6292
pass
6393

6494
@register_scheduler('slurm')
@@ -67,10 +97,13 @@ class SlurmScheduler(Scheduler):
6797
Scheduler based on Slurm https://slurm.schedmd.com/
6898
"""
6999

100+
def get_resources_for_task(self, task: ScheduledTask):
101+
return {'slurm_jobs': 1, 'slurm_cpus': task.cpus}
102+
70103
def run_task(self, task: ScheduledTask):
71104
secs = int(task.walltime.total_seconds())
72-
srun_args = [cfg.slurm_srun_bin]
73-
srun_args.extend([
105+
args = [cfg.slurm_srun_bin]
106+
args.extend([
74107
'--verbose',
75108
'--job-name', task.get_task_family(),
76109
'--comment', json.dumps({'task_id': task.task_id, 'priority': task.priority}),
@@ -79,19 +112,47 @@ def run_task(self, task: ScheduledTask):
79112
'--mem', '{}G'.format(int(task.memory)),
80113
'--cpus-per-task', str(task.cpus)])
81114
if task.scheduler_partition:
82-
srun_args.extend(['--partition', task.scheduler_partition])
115+
args.extend(['--partition', task.scheduler_partition])
116+
if task.working_directory:
117+
args.extend(['--chdir', task.working_directory])
83118
# FIXME: task.priority is not reliable and does not reflect what the
84119
# scheduler
85120
# TODO: srun_args.extend([--priority', str(max(0, cfg.scheduler_priority))])
86-
srun_args.extend(map(str, task.scheduler_extra_args))
87-
args = list(map(str, task.program_args()))
88-
env = task.program_environment()
89-
logger.info('Running Slurm command {}'.format(' '.join(srun_args + args)))
90-
proc = Popen(srun_args + args, env=env, stdout=PIPE, stderr=PIPE, universal_newlines=True)
91-
with ExternalProgramRunContext(proc):
92-
stdout, stderr = proc.communicate()
93-
if proc.returncode != 0:
94-
raise ExternalProgramRunError('Program exited with non-zero return code.', tuple(args), env, stdout, stderr)
95-
if task.capture_output:
96-
logger.info('Program stdout:\n{}'.format(stdout))
97-
logger.info('Program stderr:\n{}'.format(stderr))
121+
args.extend(map(str, task.scheduler_extra_args))
122+
args.extend(map(str, task.program_args()))
123+
_run_command(args, env=(task.program_environment()), cwd=task.working_directory,
124+
capture_output=task.capture_output)
125+
126+
@register_scheduler('local')
127+
class LocalScheduler(Scheduler):
128+
"""A scheduler that uses a local subprocess"""
129+
130+
def get_resources_for_task(self, task: ScheduledTask):
131+
return {'cpus': task.cpus, 'memory': task.memory}
132+
133+
def run_task(self, task: ScheduledTask):
134+
_run_command(list(map(str, task.program_args())), env=task.program_environment(), cwd=task.working_directory,
135+
capture_output=task.capture_output)
136+
137+
@register_scheduler('ssh')
138+
class SshScheduler(Scheduler):
139+
"""A scheduler that uses SSH to run a task remotely"""
140+
141+
def get_resources_for_task(self, task: ScheduledTask):
142+
return {'ssh_cpus': task.cpus, 'ssh_memory': task.memory}
143+
144+
def run_task(self, task: ScheduledTask):
145+
if not cfg.ssh_remote:
146+
raise ValueError('No SSH remote is configured.')
147+
args = [cfg.ssh_bin]
148+
if cfg.ssh_port:
149+
args.extend(['-p', str(cfg.ssh_port)])
150+
if cfg.ssh_user:
151+
args.extend(['-u', cfg.ssh_user])
152+
if cfg.ssh_identity_file:
153+
args.extend(['-i', cfg.ssh_identity_file])
154+
args.extend(task.scheduler_extra_args)
155+
args.append(cfg.ssh_remote)
156+
args.extend(map(str, task.program_args()))
157+
_run_command(args, env=task.program_environment(), cwd=task.working_directory,
158+
capture_output=task.capture_output)

example.luigi.cfg

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ slurm_jobs=1024
1515
# prevent workers from needlessly waiting on pending jobs if all the nodes are
1616
# busy.
1717
slurm_cpus=396
18+
# For the SSH scheduler, the number of jobs, CPUs and memory are managed per-host
19+
ssh_cpus=8
20+
# In gigabytes
21+
ssh_memory=32
1822
# Limit the number of I/O-bound jobs
1923
# The following are considered I/O-bound: prefetch, fastq-dump, fastqc,
2024
# bamtofastq
@@ -39,11 +43,13 @@ cellranger_bamtofastq_jobs=10
3943
[bioluigi]
4044
scheduler=local
4145
scheduler_extra_args=[]
46+
# Remote to use for the SSH scheduler
47+
ssh_remote=
4248
cutadapt_bin=cutadapt
4349
star_bin=STAR
4450
rsem_dir=
4551
bcftools_bin=bcftools
4652
vep_bin=vep
4753
# If this is not specified, it will default to $HOME/.vep
4854
vep_dir=
49-
cellranger_bin=cellranger
55+
cellranger_bin=cellranger

luigi.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
example.luigi.cfg

tests/test_scheduled_external_program.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66

77
from bioluigi.scheduled_external_program import ScheduledExternalProgramTask
8+
from bioluigi.schedulers import get_available_schedulers
89

910
class MyTask(ScheduledExternalProgramTask):
1011
cpus = 4
@@ -35,19 +36,30 @@ def test_default_scheduler():
3536
assert task.complete()
3637

3738
def test_local_scheduler():
39+
assert 'local' in get_available_schedulers()
3840
task = MyTask("2", scheduler='local')
41+
assert task.resources['cpus'] == 4
42+
assert task.resources['memory'] == 1
3943
assert not task.complete()
4044
luigi.build([task], local_scheduler=True)
4145
assert task.complete()
4246

4347
def test_slurm_scheduler():
44-
if which('srun') is None:
45-
pytest.skip('srun is needed to run Slurm tests.')
48+
assert 'slurm' in get_available_schedulers()
4649
task = MyTask("3", scheduler='slurm')
4750
assert 'slurm_jobs' in task.resources
4851
assert 'slurm_cpus' in task.resources
4952
assert task.resources['slurm_jobs'] == 1
5053
assert task.resources['slurm_cpus'] == 4
5154
assert not task.complete()
55+
if which('srun') is None:
56+
pytest.skip('srun is needed to run Slurm scheduler tests.')
5257
luigi.build([task], local_scheduler=True)
5358
assert task.complete()
59+
60+
def test_ssh_scheduler():
61+
task = MyTask('5', scheduler='ssh')
62+
assert task.resources == {'ssh_cpus': 4, 'ssh_memory': 1}
63+
# FIXME:
64+
# luigi.build([task], local_scheduler=True)
65+
# assert task.complete()

0 commit comments

Comments
 (0)