diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5c3148ef..03806770 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,23 @@ 2.1.0 (UNRELEASED) +------------------ + +* `#243 `__: Added ``main_thread_only`` + execmodel which is derived from the thread execmodel and only executes ``remote_exec`` + calls in the main thread. + Callers of ``remote_exec`` must use the returned channel to wait for a task to complete + before they call remote_exec again, otherwise the ``remote_exec`` call will fail with a + ``concurrent remote_exec would cause deadlock`` error. The main_thread_only execmodel + provides solutions for `#96 `__ and + `pytest-dev/pytest-xdist#620 `__ + (pending a new `pytest-xdist` release). + + Also fixed ``init_popen_io`` to use ``closefd=False`` for shared stdin and stdout file + descriptors, preventing ``Bad file descriptor`` errors triggered by test_stdouterrin_setnull. * Removed support for Python 3.7. * Added official support for Python 3.12. + 2.0.2 (2023-07-09) ------------------ diff --git a/doc/basics.rst b/doc/basics.rst index aa6dabaf..f0eebd85 100644 --- a/doc/basics.rst +++ b/doc/basics.rst @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()`` yourself and specify a larger or not timeout. -threading models: gevent, eventlet, thread -=========================================== +threading models: gevent, eventlet, thread, main_thread_only +==================================================================== .. versionadded:: 1.2 (status: experimental!) -execnet supports "thread", "eventlet" and "gevent" as thread models -on each of the two sides. You need to decide which model to use -before you create any gateways:: +execnet supports "main_thread_only", "thread", "eventlet" and "gevent" +as thread models on each of the two sides. You need to decide which +model to use before you create any gateways:: # content of threadmodel.py import execnet diff --git a/src/execnet/gateway_base.py b/src/execnet/gateway_base.py index c055d64f..2e2859f9 100644 --- a/src/execnet/gateway_base.py +++ b/src/execnet/gateway_base.py @@ -61,7 +61,7 @@ def sleep(self, delay): raise NotImplementedError() @abc.abstractmethod - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): raise NotImplementedError() @abc.abstractmethod @@ -113,10 +113,10 @@ def start(self, func, args=()): return _thread.start_new_thread(func, args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): import os - return os.fdopen(fd, mode, bufsize, encoding="utf-8") + return os.fdopen(fd, mode, bufsize, encoding="utf-8", closefd=closefd) def Lock(self): import threading @@ -134,6 +134,10 @@ def Event(self): return threading.Event() +class MainThreadOnlyExecModel(ThreadExecModel): + backend = "main_thread_only" + + class EventletExecModel(ExecModel): backend = "eventlet" @@ -170,10 +174,10 @@ def start(self, func, args=()): return eventlet.spawn_n(func, *args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): import eventlet.green.os - return eventlet.green.os.fdopen(fd, mode, bufsize) + return eventlet.green.os.fdopen(fd, mode, bufsize, closefd=closefd) def Lock(self): import eventlet.green.threading @@ -227,11 +231,11 @@ def start(self, func, args=()): return gevent.spawn(func, *args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): # XXX import gevent.fileobject - return gevent.fileobject.FileObjectThread(fd, mode, bufsize) + return gevent.fileobject.FileObjectThread(fd, mode, bufsize, closefd=closefd) def Lock(self): import gevent.lock @@ -254,6 +258,8 @@ def get_execmodel(backend): return backend if backend == "thread": return ThreadExecModel() + elif backend == "main_thread_only": + return MainThreadOnlyExecModel() elif backend == "eventlet": return EventletExecModel() elif backend == "gevent": @@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False): self._shuttingdown = False self._waitall_events = [] if hasprimary: - if self.execmodel.backend != "thread": + if self.execmodel.backend not in ("thread", "main_thread_only"): raise ValueError("hasprimary=True requires thread model") self._primary_thread_task_ready = self.execmodel.Event() else: @@ -332,7 +338,7 @@ def integrate_as_primary_thread(self): """integrate the thread with which we are called as a primary thread for executing functions triggered with spawn(). """ - assert self.execmodel.backend == "thread", self.execmodel + assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel primary_thread_task_ready = self._primary_thread_task_ready # interacts with code at REF1 while 1: @@ -345,7 +351,11 @@ def integrate_as_primary_thread(self): with self._running_lock: if self._shuttingdown: break - primary_thread_task_ready.clear() + # Only clear if _try_send_to_primary_thread has not + # yet set the next self._primary_thread_task reply + # after waiting for this one to complete. + if reply is self._primary_thread_task: + primary_thread_task_ready.clear() def trigger_shutdown(self): with self._running_lock: @@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply): # wake up primary thread primary_thread_task_ready.set() return True + elif ( + self.execmodel.backend == "main_thread_only" + and self._primary_thread_task is not None + ): + self._primary_thread_task.waitfinish() + self._primary_thread_task = reply + # wake up primary thread (it's okay if this is already set + # because we waited for the previous task to finish above + # and integrate_as_primary_thread will not clear it when + # it enters self._running_lock if it detects that a new + # task is available) + primary_thread_task_ready.set() + return True return False def spawn(self, func, *args, **kwargs): @@ -857,6 +880,9 @@ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): ENDMARKER = object() INTERRUPT_TEXT = "keyboard-interrupted" +MAIN_THREAD_ONLY_DEADLOCK_TEXT = ( + "concurrent remote_exec would cause deadlock for main_thread_only execmodel" +) class ChannelFactory: @@ -1105,6 +1131,20 @@ def join(self, timeout=None): class WorkerGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): + if self._execpool.execmodel.backend == "main_thread_only": + # It's necessary to wait for a short time in order to ensure + # that we do not report a false-positive deadlock error, since + # channel close does not elicit a response that would provide + # a guarantee to remote_exec callers that the previous task + # has released the main thread. If the timeout expires then it + # should be practically impossible to report a false-positive. + if not self._executetask_complete.wait(timeout=1): + channel.close(MAIN_THREAD_ONLY_DEADLOCK_TEXT) + return + # It's only safe to clear here because the above wait proves + # that there is not a previous task about to set it again. + self._executetask_complete.clear() + sourcetask = loads_internal(sourcetask) self._execpool.spawn(self.executetask, (channel, sourcetask)) @@ -1132,8 +1172,14 @@ def serve(self): def trace(msg): self._trace("[serve] " + msg) - hasprimary = self.execmodel.backend == "thread" + hasprimary = self.execmodel.backend in ("thread", "main_thread_only") self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary) + self._executetask_complete = None + if self.execmodel.backend == "main_thread_only": + self._executetask_complete = self.execmodel.Event() + # Initialize state to indicate that there is no previous task + # executing so that we don't need a separate flag to track this. + self._executetask_complete.set() trace("spawning receiver thread") self._initreceive() try: @@ -1176,6 +1222,11 @@ def executetask(self, item): return self._trace("ignoring EOFError because receiving finished") channel.close() + if self._executetask_complete is not None: + # Indicate that this task has finished executing, meaning + # that there is no possibility of it triggering a deadlock + # for the next spawn call. + self._executetask_complete.set() # @@ -1631,8 +1682,10 @@ def init_popen_io(execmodel): os.dup2(fd, 2) os.close(fd) io = Popen2IO(stdout, stdin, execmodel) - sys.stdin = execmodel.fdopen(0, "r", 1) - sys.stdout = execmodel.fdopen(1, "w", 1) + # Use closefd=False since 0 and 1 are shared with + # sys.__stdin__ and sys.__stdout__. + sys.stdin = execmodel.fdopen(0, "r", 1, closefd=False) + sys.stdout = execmodel.fdopen(1, "w", 1, closefd=False) return io diff --git a/src/execnet/multi.py b/src/execnet/multi.py index 7629ddb3..c63a57e8 100644 --- a/src/execnet/multi.py +++ b/src/execnet/multi.py @@ -107,7 +107,7 @@ def makegateway(self, spec=None): id= specifies the gateway id python= specifies which python interpreter to execute - execmodel=model 'thread', 'eventlet', 'gevent' model for execution + execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution chdir= specifies to which directory to change nice= specifies process priority of new process env:NAME=value specifies a remote environment variable setting. diff --git a/testing/conftest.py b/testing/conftest.py index 86bd93e0..2ee21202 100644 --- a/testing/conftest.py +++ b/testing/conftest.py @@ -124,7 +124,7 @@ def anypython(request): pytest.skip(f"no {name} found") if "execmodel" in request.fixturenames and name != "sys.executable": backend = request.getfixturevalue("execmodel").backend - if backend != "thread": + if backend not in ("thread", "main_thread_only"): pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}") return executable @@ -173,9 +173,11 @@ def gw(request, execmodel, group): return gw -@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session") +@pytest.fixture( + params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session" +) def execmodel(request): - if request.param != "thread": + if request.param not in ("thread", "main_thread_only"): pytest.importorskip(request.param) if request.param in ("eventlet", "gevent") and sys.platform == "win32": pytest.xfail(request.param + " does not work on win32") diff --git a/testing/test_basics.py b/testing/test_basics.py index 321e18f1..26816f3e 100644 --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -249,12 +249,25 @@ class Arg: @pytest.mark.skipif("not hasattr(os, 'dup')") def test_stdouterrin_setnull(execmodel, capfd): - gateway_base.init_popen_io(execmodel) - os.write(1, b"hello") - os.read(0, 1) - out, err = capfd.readouterr() - assert not out - assert not err + # Backup and restore stdin state, and rely on capfd to handle + # this for stdout and stderr. + orig_stdin = sys.stdin + orig_stdin_fd = os.dup(0) + try: + # The returned Popen2IO instance can be garbage collected + # prematurely since we don't hold a reference here, but we + # tolerate this because it is intended to leave behind a + # sane state afterwards. + gateway_base.init_popen_io(execmodel) + os.write(1, b"hello") + os.read(0, 1) + out, err = capfd.readouterr() + assert not out + assert not err + finally: + sys.stdin = orig_stdin + os.dup2(orig_stdin_fd, 0) + os.close(orig_stdin_fd) class PseudoChannel: diff --git a/testing/test_gateway.py b/testing/test_gateway.py index 809a13d9..ee4ce375 100644 --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -525,3 +525,82 @@ def sendback(channel): if interleave_getstatus: print(gw.remote_status()) assert ch.receive(timeout=0.5) == 1234 + + +def test_assert_main_thread_only(execmodel, makegateway): + if execmodel.backend != "main_thread_only": + pytest.skip("can only run with main_thread_only") + + gw = makegateway(spec=f"execmodel={execmodel.backend}//popen") + + try: + # Submit multiple remote_exec requests in quick succession and + # assert that all tasks execute in the main thread. It is + # necessary to call receive on each channel before the next + # remote_exec call, since the channel will raise an error if + # concurrent remote_exec requests are submitted as in + # test_main_thread_only_concurrent_remote_exec_deadlock. + for i in range(10): + ch = gw.remote_exec( + """ + import time, threading + time.sleep(0.02) + channel.send(threading.current_thread() is threading.main_thread()) + """ + ) + + try: + res = ch.receive() + finally: + ch.close() + # This doesn't actually block because we closed + # the channel already, but it does check for remote + # errors and raise them. + ch.waitclose() + if res is not True: + pytest.fail("remote raised\n%s" % res) + finally: + gw.exit() + gw.join() + + +def test_main_thread_only_concurrent_remote_exec_deadlock(execmodel, makegateway): + if execmodel.backend != "main_thread_only": + pytest.skip("can only run with main_thread_only") + + gw = makegateway(spec=f"execmodel={execmodel.backend}//popen") + channels = [] + try: + # Submit multiple remote_exec requests in quick succession and + # assert that MAIN_THREAD_ONLY_DEADLOCK_TEXT is raised if + # concurrent remote_exec requests are submitted for the + # main_thread_only execmodel (as compensation for the lack of + # back pressure in remote_exec calls which do not attempt to + # block until the remote main thread is idle). + for i in range(2): + channels.append( + gw.remote_exec( + """ + import threading + channel.send(threading.current_thread() is threading.main_thread()) + # Wait forever, ensuring that the deadlock case triggers. + channel.gateway.execmodel.Event().wait() + """ + ) + ) + + expected_results = ( + True, + execnet.gateway_base.MAIN_THREAD_ONLY_DEADLOCK_TEXT, + ) + for expected, ch in zip(expected_results, channels): + try: + res = ch.receive() + except execnet.RemoteError as e: + res = e.formatted + assert res == expected + finally: + for ch in channels: + ch.close() + gw.exit() + gw.join() diff --git a/testing/test_multi.py b/testing/test_multi.py index 79861b11..3996a964 100644 --- a/testing/test_multi.py +++ b/testing/test_multi.py @@ -223,8 +223,9 @@ def test_terminate_with_proxying(self): group.terminate(1.0) +@pytest.mark.xfail(reason="active_count() has been broken for some time") def test_safe_terminate(execmodel): - if execmodel.backend != "threading": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail( "execution model %r does not support task count" % execmodel.backend ) @@ -246,8 +247,9 @@ def kill(): assert execmodel.active_count() == active +@pytest.mark.xfail(reason="active_count() has been broken for some time") def test_safe_terminate2(execmodel): - if execmodel.backend != "threading": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail( "execution model %r does not support task count" % execmodel.backend ) diff --git a/testing/test_termination.py b/testing/test_termination.py index 282c1979..5da516c4 100644 --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -36,7 +36,7 @@ def doit(): def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail("test and execnet not compatible to greenlets yet") gw = makegateway("popen") q = execmodel.queue.Queue() @@ -97,8 +97,12 @@ def test_close_initiating_remote_no_error(testdir, anypython): def test_terminate_implicit_does_trykill(testdir, anypython, capfd, pool): - if pool.execmodel != "thread": + if pool.execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail("only os threading model supported") + if sys.version_info >= (3, 12): + pytest.xfail( + "since python3.12 this test triggers RuntimeError: can't create new thread at interpreter shutdown" + ) p = testdir.makepyfile( """ import sys diff --git a/testing/test_threadpool.py b/testing/test_threadpool.py index 4d1edd8c..0162e2ea 100644 --- a/testing/test_threadpool.py +++ b/testing/test_threadpool.py @@ -164,7 +164,7 @@ def wait_then_put(): def test_primary_thread_integration(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): with pytest.raises(ValueError): WorkerPool(execmodel=execmodel, hasprimary=True) return @@ -188,7 +188,7 @@ def func(): def test_primary_thread_integration_shutdown(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.skip("can only run with threading") pool = WorkerPool(execmodel=execmodel, hasprimary=True) queue = execmodel.queue.Queue()