diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py index 108cf54bf79e6f..cc387883141b0e 100644 --- a/Lib/test/test_concurrent_futures/test_wait.py +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -1,9 +1,9 @@ import sys import threading -import time import unittest from concurrent import futures from test import support +from test.support import threading_helper from .util import ( CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, @@ -16,15 +16,15 @@ def mul(x, y): return x * y -def sleep_and_raise(t): - time.sleep(t) +def wait_and_raise(e): + e.wait() raise Exception('this is an exception') class WaitTests: def test_20369(self): # See https://bugs.python.org/issue20369 - future = self.executor.submit(time.sleep, 1.5) + future = self.executor.submit(mul, 1, 2) done, not_done = futures.wait([future, future], return_when=futures.ALL_COMPLETED) self.assertEqual({future}, done) @@ -32,66 +32,102 @@ def test_20369(self): def test_first_completed(self): + event = self.create_event() future1 = self.executor.submit(mul, 21, 2) - future2 = self.executor.submit(time.sleep, 1.5) + future2 = self.executor.submit(event.wait) - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) + try: + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) - self.assertEqual(set([future1]), done) - self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + finally: + event.set() + future2.result() # wait for job to finish def test_first_completed_some_already_completed(self): - future1 = self.executor.submit(time.sleep, 1.5) - - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) + event = self.create_event() + future1 = self.executor.submit(event.wait) - self.assertEqual( - set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), - finished) - self.assertEqual(set([future1]), pending) + try: + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) + finally: + event.set() + future1.result() # wait for job to finish - @support.requires_resource('walltime') def test_first_exception(self): - future1 = self.executor.submit(mul, 2, 21) - future2 = self.executor.submit(sleep_and_raise, 1.5) - future3 = self.executor.submit(time.sleep, 3) + event1 = self.create_event() + event2 = self.create_event() + try: + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(wait_and_raise, event1) + future3 = self.executor.submit(event2.wait) - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) + # Ensure that future1 is completed before future2 finishes + def wait_for_future1(): + future1.result() + event1.set() + + t = threading.Thread(target=wait_for_future1) + t.start() + + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) - self.assertEqual(set([future1, future2]), finished) - self.assertEqual(set([future3]), pending) + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) + + threading_helper.join_thread(t) + finally: + event1.set() + event2.set() + future3.result() # wait for job to finish def test_first_exception_some_already_complete(self): + event = self.create_event() future1 = self.executor.submit(divmod, 21, 0) - future2 = self.executor.submit(time.sleep, 1.5) - - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) + future2 = self.executor.submit(event.wait) - self.assertEqual(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + try: + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + finally: + event.set() + future2.result() # wait for job to finish def test_first_exception_one_already_failed(self): - future1 = self.executor.submit(time.sleep, 2) + event = self.create_event() + future1 = self.executor.submit(event.wait) - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) + try: + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) - self.assertEqual(set([EXCEPTION_FUTURE]), finished) - self.assertEqual(set([future1]), pending) + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) + finally: + event.set() + future1.result() # wait for job to finish def test_all_completed(self): future1 = self.executor.submit(divmod, 2, 0) @@ -114,23 +150,27 @@ def test_all_completed(self): def test_timeout(self): short_timeout = 0.050 - long_timeout = short_timeout * 10 - future = self.executor.submit(time.sleep, long_timeout) + event = self.create_event() + future = self.executor.submit(event.wait) - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future], - timeout=short_timeout, - return_when=futures.ALL_COMPLETED) - - self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - finished) - self.assertEqual(set([future]), pending) + try: + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future], + timeout=short_timeout, + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future]), pending) + finally: + event.set() + future.result() # wait for job to finish class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index 3b8ec3e205d5aa..e85ef3b1c91681 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -1,5 +1,6 @@ import multiprocessing import sys +import threading import time import unittest from concurrent import futures @@ -50,14 +51,19 @@ def setUp(self): max_workers=self.worker_count, mp_context=self.get_context(), **self.executor_kwargs) + self.manager = self.get_context().Manager() else: self.executor = self.executor_type( max_workers=self.worker_count, **self.executor_kwargs) + self.manager = None def tearDown(self): self.executor.shutdown(wait=True) self.executor = None + if self.manager is not None: + self.manager.shutdown() + self.manager = None dt = time.monotonic() - self.t1 if support.verbose: @@ -73,6 +79,9 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def create_event(self): + return threading.Event() + class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -89,6 +98,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_event(self): + return self.manager.Event() + class ProcessPoolSpawnMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -101,6 +113,9 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() + def create_event(self): + return self.manager.Event() + class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -117,6 +132,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_event(self): + return self.manager.Event() + def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), executor_mixins=(ThreadPoolMixin,