Skip to content

Fix call_soon_threadsafe thread safety #421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import fcntl
import logging
import os
import random
import sys
import threading
import time
Expand Down Expand Up @@ -702,6 +703,33 @@ async def foo():
self.loop.run_until_complete(
self.loop.shutdown_default_executor())

def test_call_soon_threadsafe_safety(self):
ITERATIONS = 4096
counter = [0]

def cb():
counter[0] += 1
if counter[0] < ITERATIONS - 512:
h = self.loop.call_later(0.01, lambda: None)
self.loop.call_later(
0.0005 + random.random() * 0.0005, h.cancel
)

def scheduler():
loop = self.loop
for i in range(ITERATIONS):
if loop.is_running():
loop.call_soon_threadsafe(cb)
time.sleep(0.001)
loop.call_soon_threadsafe(loop.stop)

thread = threading.Thread(target=scheduler)

self.loop.call_soon(thread.start)
self.loop.run_forever()
thread.join()
self.assertEqual(counter[0], ITERATIONS)


class TestBaseUV(_TestBase, UVTestCase):

Expand Down
1 change: 1 addition & 0 deletions uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ cdef class Loop:
cdef _exec_queued_writes(self)

cdef inline _call_soon(self, object callback, object args, object context)
cdef inline _append_ready_handle(self, Handle handle)
cdef inline _call_soon_handle(self, Handle handle)

cdef _call_later(self, uint64_t delay, object callback, object args,
Expand Down
13 changes: 10 additions & 3 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ cdef class Loop:
if handle._cancelled:
self.remove_signal_handler(sig) # Remove it properly.
else:
self._call_soon_handle(handle)
self._append_ready_handle(handle)
self.handler_async.send()

cdef _on_wake(self):
Expand Down Expand Up @@ -667,10 +667,13 @@ cdef class Loop:
self._call_soon_handle(handle)
return handle

cdef inline _call_soon_handle(self, Handle handle):
cdef inline _append_ready_handle(self, Handle handle):
self._check_closed()
self._ready.append(handle)
self._ready_len += 1

cdef inline _call_soon_handle(self, Handle handle):
self._append_ready_handle(handle)
if not self.handler_idle.running:
self.handler_idle.start()

Expand Down Expand Up @@ -1281,7 +1284,11 @@ cdef class Loop:
"""Like call_soon(), but thread-safe."""
if not args:
args = None
handle = self._call_soon(callback, args, context)
cdef Handle handle = new_Handle(self, callback, args, context)
self._append_ready_handle(handle) # deque append is atomic
# libuv async handler is thread-safe while the idle handler is not -
# we only set the async handler here, which will start the idle handler
# in _on_wake() from the loop and eventually call the callback.
self.handler_async.send()
return handle

Expand Down