Skip to content

gh-96471: Add asyncio queue shutdown #104228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 6, 2024

Conversation

EpicWink
Copy link
Contributor

@EpicWink EpicWink commented May 6, 2023

asyncio-only changes from #102499 (which supercedes #96474), updated to match the API introduced by #104750


📚 Documentation preview 📚: https://cpython-previews--104228.org.readthedocs.build/

EpicWink and others added 2 commits May 6, 2023 14:50
* Queue state enum members are capitalised
* Termination state in str/repr
* Include raised exception in docstrings
* Factor out queue-state checks and updates to methods
* Logic fixes in get_nowait and shutdown
* Handle queue shutdown in task_done and join
* Updated tests
* Document feature added in 3.13
@EpicWink
Copy link
Contributor Author

EpicWink commented May 6, 2023

@willingc ready for review

@gvanrossum
Copy link
Member

asyncio-only changes from #102499 (on top of #96474)

I assume the "on top of ..." part is obsolete, since that PR was closed without merging. I'll add @willingc as a reviewer per your request.

@gvanrossum gvanrossum requested a review from willingc May 9, 2023 17:56
@gvanrossum
Copy link
Member

@EpicWink Could you merge main?

@EpicWink
Copy link
Contributor Author

Could you merge main?

Done, but like #104750, we may want to modify the implementation of immediate=True to simply consume the queue rather than having a separate state.

@gvanrossum
Copy link
Member

Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it? (At least the feature wasn't reverted.)

@EpicWink
Copy link
Contributor Author

Eh, @EpicWink, did you see the hubbub about the hanging test in your previous PR? The test was disabled because it kept hanging in CI on various platforms. Could you look into what's wrong with it?

I did see the follow-on issue and pull-requests. Yves and I are working on fixing it (you can follow the changes here: main...EpicWink:cpython:fix-thread-queue-shutdown-test)

@gvanrossum
Copy link
Member

Awesome! (An acknowledgement that you were working on it would have lessened my stress. :-)

@gvanrossum
Copy link
Member

Looking at this with an eye towards review, I realized that the threaded-queue implementation doesn't appear to match its documentation: It appears that q.join() does not raise ShutDown when the queue is shutdown immediately. The tests don't seem to be testing for this either. (Come to think of it, the test never seems to take the except self.queue.ShutDown path -- if I put a breakpoint there it never gets hit during any of the tests. The test logic is pretty convoluted, which is my only excuse for not having caught this in the review of gh-104750.)

@EpicWink
Copy link
Contributor Author

EpicWink commented Feb 22, 2024 via email

@gvanrossum
Copy link
Member

Now that the immediate-consume implementation of the threading queue shutdown has been accepted, I'm going to do the same here and for multiprocessing. I'll personally rewrite the tests to be more readable and obvious.

And what about queue.Queue.join()? Should it raise after shutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)

Copy link
Member

@gvanrossum gvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll wait reviewing until you've rewritten shutdown, but here's one doc markup nit.

Also, please mark PRs that aren't ready for review as Draft.

@EpicWink EpicWink marked this pull request as draft February 22, 2024 01:14
@EpicWink
Copy link
Contributor Author

And what about queue.Queue.join()? Should it raise after shutdown(immediate=True)? Either way some changes are needed -- either in the implementation or in the docs. And the tests there too (what you wrote above appears to apply to the asyncio and multiprocessing versions only.)

The behaviour should (and I intend to implement it to) be the same between the threading, multiprocessing and asyncio queues

@YvesDup
Copy link
Contributor

YvesDup commented Feb 22, 2024

By the way, should task_done() raise an exception after shutdown or not ?
update; see https://docs.python.org/3.13/library/queue.html#queue.Queue.task_done

@EpicWink
Copy link
Contributor Author

EpicWink commented Feb 25, 2024

By the way, should task_done() raise an exception after shutdown or not ?

@YvesDup No (except the usual ValueError); see #115838

@EpicWink
Copy link
Contributor Author

EpicWink commented Apr 4, 2024

I want to trust that you two have done the right thing there

@gvanrossum please don't trust, have you seen xz-utils's backdoor?

but not push until the big test run completes? (Buildbot tests can take many hours, sorry.)

Oops, sorry. I forgot to hold off my most recent push

ALso, why is the DO-NOT-MERGE label set? Is it because of #104228?

No, I think it was because I hadn't updated the implementation to match the new agreed-upon behaviour (this PR was in draft at that time anyway, so I don't think the label was needed).

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

@EpicWink
Copy link
Contributor Author

EpicWink commented Apr 4, 2024

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue #117531, and fix in PR #117532

# Setup queue with 2 items (1 retrieved), and a join() task
q = self.q_class()
loop = asyncio.get_running_loop()
q.put_nowait("data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest:

q.put_nowait("data1")
q.put_nowait("data2")

q.put_nowait("data")
q.put_nowait("data")
join_task = loop.create_task(q.join())
self.assertEqual(await q.get(), "data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and testing on "data1".

Copy link
Contributor Author

@EpicWink EpicWink Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except when q is an instance of LifoQueue, when the value should be "data2". Is it necessary to test queue ordering in the shutdown tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot the LifoQueue class. Is this worth commenting on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented your suggestion, but I have not committed. I am considering it. I don't think it's necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case (thus including LifoQueue) is implicit. A comment would be enough.

@YvesDup
Copy link
Contributor

YvesDup commented Apr 4, 2024

there is the same bug on threading queue shutdown. I will open an issue.

@YvesDup I've created issue #117531, and fix in PR #117532

I have seen and commented

Copy link
Member

@gvanrossum gvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! This version looks great. Let me know if you agree -- I've removed the DO-NOT-MERGE label, but I'll wait until you and @YvesDup are happy too.

@EpicWink
Copy link
Contributor Author

EpicWink commented Apr 6, 2024

I think it's good to go

wondering why you always wake up all getters -- and came to the conclusion that it is needed

I should probably add a comment

I could do this, but I don't think it's worth revoking the PR's approval

@gvanrossum gvanrossum merged commit df4d84c into python:main Apr 6, 2024
All blocked callers of :meth:`~Queue.put` will be unblocked. If
*immediate* is true, also unblock callers of :meth:`~Queue.get`
and :meth:`~Queue.join`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I have a doubt, shouldn't this documentation block be rather:

    All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` 
    will be unblocked. If *immediate* is true, also unblock callers of 
    :meth:`~Queue.join`.

In event of change, the docstring of the shutdown method must be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

join callers aren't necessarily even unblocked anyway, if consumers are processing any items. I should probably say that a task is marked as done for each item in the queue if immediate shutdown.

Also, I think the threading queue docs are the same.

Copy link
Contributor

@YvesDup YvesDup Apr 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a task is marked as done for each item in the queue if immediate shutdown.

It's very precise, better.

Also, I think the threading queue docs are the same.

Yes, I commented here so as not to forget (see #117532 (comment)).

English is your native language, I think it'is best if you update documentations and docstrings.

Update: but I can create the follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a PR: #117621


All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring to modify depending of agree/disagree on my first remark about `blocked callers'.
Changes could be:

All blocked callers of put() and get() will be unblocked, and
also join() if 'immediate

* Add :meth:`asyncio.Queue.shutdown` (along with
:exc:`asyncio.QueueShutDown`) for queue termination.
(Contributed by Laurie Opperman in :gh:`104228`.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've worked well together, I'd be nice if you'd mention me.
:-)

@gvanrossum
Copy link
Member

Sorry for merging prematurely. @EpicWink Can you prepare a follow-up PR? I will merge it as soon as @YvesDup approves it. (Or, alternatively, @YvesDup can make a PR and I will merge when @EpicWink approves.)

@EpicWink EpicWink deleted the asyncio-queue-shutdown branch April 8, 2024 00:49
@EpicWink
Copy link
Contributor Author

EpicWink commented Apr 8, 2024

@gvanrossum see #117621

diegorusso pushed a commit to diegorusso/cpython that referenced this pull request Apr 17, 2024
@john-parton
Copy link

john-parton commented Aug 27, 2024

I'd like to add that the shutdown() method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:

async def async_map[T, R](
    func: Callable[[T], Awaitable[R]],
    iterable: AsyncIterable[T],
    *,
    limit: int,
    maxsize: int = -1,
) -> AsyncIterator[R]:
    if maxsize < 0:
        maxsize = limit

    arguments_queue = Queue[T](maxsize=maxsize)
    results_queue = Queue[R](maxsize=maxsize)

    async def drain():
        async for argument in iterable:
            await arguments_queue.put(argument)
        arguments_queue.shutdown(immediate=False)

    async def worker():
        while True:
            try:
                argument = await arguments_queue.get()
            except QueueShutDown:
                break
            await results_queue.put(await func(argument))

    async def background():
        async with asyncio.TaskGroup() as background_task_group:
            background_task_group.create_task(drain())
            for _ in range(limit):
                background_task_group.create_task(worker())
        results_queue.shutdown(immediate=False)
 
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(background())

        while True:
            try:
                yield await results_queue.get()
            except QueueShutDown:
                break

I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.

I know there's some backports. Is there a backports package for asyncio stuff already?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants