Skip to content

Fix canceling of a gather operation #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ def __next__(self):
self.state = None
return None
else:
self.exc.__traceback__ = None
raise self.exc
self.exc.__traceback__ = None
raise self.exc

_never = _Never()

_never = _Never()

################################################################################
# Queue and poller for stream IO
Expand Down Expand Up @@ -260,6 +260,11 @@ def run_until_complete(main_task=None):
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
Expand All @@ -281,7 +286,6 @@ def run_until_complete(main_task=None):
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)


# Create a new task from a coroutine and run it until it finishes
def run(coro):
"""Create a new task from the given coroutine and run it until it completes.
Expand Down
88 changes: 67 additions & 21 deletions asyncio/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,76 @@ def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)


async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
class _Remove:
@staticmethod
def remove(t):
pass

Returns a list of return values of all *aws*

This is a coroutine.
"""
async def gather(*aws, return_exceptions=False):
if not aws:
return []

def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)

ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except (core.CancelledError, Exception) as er:
if return_exceptions:
ts[i] = er
else:
raise er
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done

# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False

# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
core._never.state = False
await core._never
except core.CancelledError as er:
cancel_all = True
state = er

# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data

# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state

# Return the list of return values of each sub-task.
return ts