Skip to content

Commit 430df07

Browse files
authored
Prevent worker getting stuck in terminating state (#370)
* tests: add failing test that proves #369 bug, self.task is not updated correctly * fix: ensure updates of self.task are performed when self.allow_pick_jobs is set to False. close #369
1 parent b6dda0e commit 430df07

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

arq/worker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,14 @@ async def _poll_iteration(self) -> None:
382382

383383
await self.start_jobs(job_ids)
384384

385-
if self.allow_abort_jobs:
386-
await self._cancel_aborted_jobs()
387-
388-
for job_id, t in list(self.tasks.items()):
389-
if t.done():
390-
del self.tasks[job_id]
391-
# required to make sure errors in run_job get propagated
392-
t.result()
385+
if self.allow_abort_jobs:
386+
await self._cancel_aborted_jobs()
387+
388+
for job_id, t in list(self.tasks.items()):
389+
if t.done():
390+
del self.tasks[job_id]
391+
# required to make sure errors in run_job get propagated
392+
t.result()
393393

394394
await self.heart_beat()
395395

tests/test_worker.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,23 @@ async def test_worker_signal_completes_job_before_shutting_down(caplog, arq_redi
116116
async def sleep_job(ctx, time):
117117
await asyncio.sleep(time)
118118

119-
await arq_redis.enqueue_job('sleep_job', 0.2, _job_id='short_sleep') # should be cancelled
119+
await arq_redis.enqueue_job('sleep_job', 0.2, _job_id='short_sleep') # should be completed
120120
await arq_redis.enqueue_job('sleep_job', 5, _job_id='long_sleep') # should be cancelled
121121
worker = worker(
122122
functions=[func(sleep_job, name='sleep_job', max_tries=1)],
123-
job_completion_wait=0.4,
123+
job_completion_wait=0.5,
124124
job_timeout=10,
125125
)
126126
assert worker.jobs_complete == 0
127127
asyncio.create_task(worker.main())
128128
await asyncio.sleep(0.1)
129129
worker.handle_sig_wait_for_completion(signal.SIGINT)
130+
assert len(worker.tasks) == 2 # should be two tasks when sigint is sent
130131
assert worker.allow_pick_jobs is False
131-
await asyncio.sleep(0.5)
132+
await asyncio.sleep(0.3)
133+
assert len(worker.tasks) == 1 # slept a bit, first job should now be complete and self.tasks should be updated
134+
await asyncio.sleep(0.3)
135+
assert len(worker.tasks) == 0 # slept longer than `job_completion_wait`, task should be cancelled and updated
132136
logs = [rec.message for rec in caplog.records]
133137
assert 'shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 0 retries ◆ 2 to be completed' in logs
134138
assert 'shutdown on SIGINT, wait complete ◆ 1 jobs complete ◆ 0 failed ◆ 0 retries ◆ 1 ongoing to cancel' in logs

0 commit comments

Comments
 (0)