From eff5d73b211673c0014f0e5da3a789e1c0347605 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Sat, 1 Mar 2025 09:44:53 -0500 Subject: [PATCH 1/3] gh-128364: Fix flaky `test_concurrent_futures.test_wait` tests Use events instead of relying on `time.sleep()`. The tests are also now about four times faster. --- Lib/test/test_concurrent_futures/test_wait.py | 59 +++++++++++++++---- Lib/test/test_concurrent_futures/util.py | 21 +++++++ 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py index 108cf54bf79e6f..b6341533c6741c 100644 --- a/Lib/test/test_concurrent_futures/test_wait.py +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -1,6 +1,5 @@ import sys import threading -import time import unittest from concurrent import futures from test import support @@ -16,15 +15,15 @@ def mul(x, y): return x * y -def sleep_and_raise(t): - time.sleep(t) +def wait_and_raise(e): + e.wait() raise Exception('this is an exception') class WaitTests: def test_20369(self): # See https://bugs.python.org/issue20369 - future = self.executor.submit(time.sleep, 1.5) + future = self.executor.submit(mul, 1, 2) done, not_done = futures.wait([future, future], return_when=futures.ALL_COMPLETED) self.assertEqual({future}, done) @@ -32,8 +31,9 @@ def test_20369(self): def test_first_completed(self): + event = self.create_event() future1 = self.executor.submit(mul, 21, 2) - future2 = self.executor.submit(time.sleep, 1.5) + future2 = self.executor.submit(event.wait) done, not_done = futures.wait( [CANCELLED_FUTURE, future1, future2], @@ -42,8 +42,12 @@ def test_first_completed(self): self.assertEqual(set([future1]), done) self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + event.set() + future2.result() # wait for job to finish + def test_first_completed_some_already_completed(self): - future1 = self.executor.submit(time.sleep, 1.5) + event = self.create_event() + future1 = self.executor.submit(event.wait) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], @@ -54,11 +58,24 @@ def test_first_completed_some_already_completed(self): finished) self.assertEqual(set([future1]), pending) - @support.requires_resource('walltime') + event.set() + future1.result() # wait for job to finish + def test_first_exception(self): + event1 = self.create_event() + event2 = self.create_event() + future1 = self.executor.submit(mul, 2, 21) - future2 = self.executor.submit(sleep_and_raise, 1.5) - future3 = self.executor.submit(time.sleep, 3) + future2 = self.executor.submit(wait_and_raise, event1) + future3 = self.executor.submit(event2.wait) + + # Ensure that future1 is completed before future2 finishes + def wait_for_future1(): + future1.result() + event1.set() + + t = threading.Thread(target=wait_for_future1) + t.start() finished, pending = futures.wait( [future1, future2, future3], @@ -67,9 +84,14 @@ def test_first_exception(self): self.assertEqual(set([future1, future2]), finished) self.assertEqual(set([future3]), pending) + t.join() + event2.set() + future3.result() # wait for job to finish + def test_first_exception_some_already_complete(self): + event = self.create_event() future1 = self.executor.submit(divmod, 21, 0) - future2 = self.executor.submit(time.sleep, 1.5) + future2 = self.executor.submit(event.wait) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, @@ -83,8 +105,12 @@ def test_first_exception_some_already_complete(self): future1]), finished) self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + event.set() + future2.result() # wait for job to finish + def test_first_exception_one_already_failed(self): - future1 = self.executor.submit(time.sleep, 2) + event = self.create_event() + future1 = self.executor.submit(event.wait) finished, pending = futures.wait( [EXCEPTION_FUTURE, future1], @@ -93,6 +119,9 @@ def test_first_exception_one_already_failed(self): self.assertEqual(set([EXCEPTION_FUTURE]), finished) self.assertEqual(set([future1]), pending) + event.set() + future1.result() # wait for job to finish + def test_all_completed(self): future1 = self.executor.submit(divmod, 2, 0) future2 = self.executor.submit(mul, 2, 21) @@ -114,9 +143,9 @@ def test_all_completed(self): def test_timeout(self): short_timeout = 0.050 - long_timeout = short_timeout * 10 - future = self.executor.submit(time.sleep, long_timeout) + event = self.create_event() + future = self.executor.submit(event.wait) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, @@ -132,6 +161,10 @@ def test_timeout(self): finished) self.assertEqual(set([future]), pending) + # Set the event to allow the future to complete + event.set() + future.result() # wait for job to finish + class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index 44086217f9dbcb..efe96f110432e6 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -1,5 +1,6 @@ import multiprocessing import sys +import threading import time import unittest from concurrent import futures @@ -50,14 +51,19 @@ def setUp(self): max_workers=self.worker_count, mp_context=self.get_context(), **self.executor_kwargs) + self.manager = multiprocessing.Manager() else: self.executor = self.executor_type( max_workers=self.worker_count, **self.executor_kwargs) + self.manager = None def tearDown(self): self.executor.shutdown(wait=True) self.executor = None + if self.manager is not None: + self.manager.shutdown() + self.manager = None dt = time.monotonic() - self.t1 if support.verbose: @@ -73,11 +79,17 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def create_event(self): + return threading.Event() + @support.skip_if_sanitizer("gh-129824: data races in InterpreterPool tests", thread=True) class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor + def create_event(self): + self.skipTest("InterpreterPoolExecutor doesn't support events") + class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -94,6 +106,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_event(self): + return self.manager.Event() + class ProcessPoolSpawnMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -106,6 +121,9 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() + def create_event(self): + return self.manager.Event() + class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor @@ -122,6 +140,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_event(self): + return self.manager.Event() + def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), executor_mixins=(ThreadPoolMixin, From f3cc50fac7b1dd4753008f318ee876b190fce76a Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Sun, 2 Mar 2025 09:47:59 -0500 Subject: [PATCH 2/3] Use self.get_context().Manager() --- Lib/test/test_concurrent_futures/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index efe96f110432e6..b12940414d9142 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -51,7 +51,7 @@ def setUp(self): max_workers=self.worker_count, mp_context=self.get_context(), **self.executor_kwargs) - self.manager = multiprocessing.Manager() + self.manager = self.get_context().Manager() else: self.executor = self.executor_type( max_workers=self.worker_count, From 8eeaf636c47576893121cb900822ad363004e52e Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Wed, 5 Mar 2025 19:32:37 +0000 Subject: [PATCH 3/3] Use threading_helper.join() and try/finally --- Lib/test/test_concurrent_futures/test_wait.py | 147 +++++++++--------- 1 file changed, 77 insertions(+), 70 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_wait.py b/Lib/test/test_concurrent_futures/test_wait.py index b6341533c6741c..cc387883141b0e 100644 --- a/Lib/test/test_concurrent_futures/test_wait.py +++ b/Lib/test/test_concurrent_futures/test_wait.py @@ -3,6 +3,7 @@ import unittest from concurrent import futures from test import support +from test.support import threading_helper from .util import ( CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, @@ -35,57 +36,61 @@ def test_first_completed(self): future1 = self.executor.submit(mul, 21, 2) future2 = self.executor.submit(event.wait) - done, not_done = futures.wait( - [CANCELLED_FUTURE, future1, future2], - return_when=futures.FIRST_COMPLETED) - - self.assertEqual(set([future1]), done) - self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + try: + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) - event.set() + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + finally: + event.set() future2.result() # wait for job to finish def test_first_completed_some_already_completed(self): event = self.create_event() future1 = self.executor.submit(event.wait) - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], - return_when=futures.FIRST_COMPLETED) - - self.assertEqual( - set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), - finished) - self.assertEqual(set([future1]), pending) - - event.set() + try: + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) + finally: + event.set() future1.result() # wait for job to finish def test_first_exception(self): event1 = self.create_event() event2 = self.create_event() + try: + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(wait_and_raise, event1) + future3 = self.executor.submit(event2.wait) - future1 = self.executor.submit(mul, 2, 21) - future2 = self.executor.submit(wait_and_raise, event1) - future3 = self.executor.submit(event2.wait) + # Ensure that future1 is completed before future2 finishes + def wait_for_future1(): + future1.result() + event1.set() - # Ensure that future1 is completed before future2 finishes - def wait_for_future1(): - future1.result() - event1.set() + t = threading.Thread(target=wait_for_future1) + t.start() - t = threading.Thread(target=wait_for_future1) - t.start() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) - finished, pending = futures.wait( - [future1, future2, future3], - return_when=futures.FIRST_EXCEPTION) + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) - self.assertEqual(set([future1, future2]), finished) - self.assertEqual(set([future3]), pending) - - t.join() - event2.set() + threading_helper.join_thread(t) + finally: + event1.set() + event2.set() future3.result() # wait for job to finish def test_first_exception_some_already_complete(self): @@ -93,33 +98,35 @@ def test_first_exception_some_already_complete(self): future1 = self.executor.submit(divmod, 21, 0) future2 = self.executor.submit(event.wait) - finished, pending = futures.wait( - [SUCCESSFUL_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1, future2], - return_when=futures.FIRST_EXCEPTION) - - self.assertEqual(set([SUCCESSFUL_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - future1]), finished) - self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) - - event.set() + try: + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + finally: + event.set() future2.result() # wait for job to finish def test_first_exception_one_already_failed(self): event = self.create_event() future1 = self.executor.submit(event.wait) - finished, pending = futures.wait( - [EXCEPTION_FUTURE, future1], - return_when=futures.FIRST_EXCEPTION) - - self.assertEqual(set([EXCEPTION_FUTURE]), finished) - self.assertEqual(set([future1]), pending) + try: + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) - event.set() + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) + finally: + event.set() future1.result() # wait for job to finish def test_all_completed(self): @@ -147,22 +154,22 @@ def test_timeout(self): event = self.create_event() future = self.executor.submit(event.wait) - finished, pending = futures.wait( - [CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE, - future], - timeout=short_timeout, - return_when=futures.ALL_COMPLETED) - - self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]), - finished) - self.assertEqual(set([future]), pending) - - # Set the event to allow the future to complete - event.set() + try: + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future], + timeout=short_timeout, + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future]), pending) + finally: + event.set() future.result() # wait for job to finish