Skip to content

[3.11] gh-108416: Mark slow but not CPU bound test methods with requires_resource('walltime') (GH-108480) #108924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ def test_close(self):

close_queue(q)

@support.requires_resource('walltime')
def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
Expand Down Expand Up @@ -4919,6 +4920,7 @@ def test_wait_slow(self):
def test_wait_socket_slow(self):
self.test_wait_socket(True)

@support.requires_resource('walltime')
def test_wait_timeout(self):
from multiprocessing.connection import wait

Expand Down Expand Up @@ -4947,6 +4949,7 @@ def signal_and_sleep(cls, sem, period):
sem.release()
time.sleep(period)

@support.requires_resource('walltime')
def test_wait_integer(self):
from multiprocessing.connection import wait

Expand Down
4 changes: 3 additions & 1 deletion Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@

cpu - Used for certain CPU-heavy tests.

walltime - Long running but not CPU-bound tests.

subprocess Run all tests for the subprocess module.

urlfetch - It is okay to download files required on testing.
Expand All @@ -129,7 +131,7 @@


ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')

# Other resources excluded from --use=all:
#
Expand Down
108 changes: 108 additions & 0 deletions Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import threading
import time
import weakref
from concurrent import futures
from test import support


def mul(x, y):
return x * y

def capture(*args, **kwargs):
return args, kwargs


class MyObject(object):
def my_method(self):
pass


def make_dummy_object(_):
return MyObject()


class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())

def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
future = self.executor.submit(capture, 1, self=2, fn=3)
self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
with self.assertRaises(TypeError):
self.executor.submit(fn=capture, arg=1)
with self.assertRaises(TypeError):
self.executor.submit(arg=1)

def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))

self.assertEqual(
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))

def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)

@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
try:
for i in self.executor.map(time.sleep,
[0, 0, 6],
timeout=5):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')

self.assertEqual([None, None], results)

def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
# have exited).
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()

@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
# references.
my_object = MyObject()
my_object_collected = threading.Event()
my_object_callback = weakref.ref(
my_object, lambda obj: my_object_collected.set())
# Deliberately discarding the future.
self.executor.submit(my_object.my_method)
del my_object

collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(collected,
"Stale reference not collected within timeout.")

def test_max_workers_negative(self):
for number in (0, -1):
with self.assertRaisesRegex(ValueError,
"max_workers must be greater "
"than 0"):
self.executor_type(max_workers=number)

def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
for obj in self.executor.map(make_dummy_object, range(10)):
wr = weakref.ref(obj)
del obj
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
164 changes: 164 additions & 0 deletions Lib/test/test_concurrent_futures/test_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import sys
import threading
import time
import unittest
from concurrent import futures
from test import support

from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
create_executor_tests, setup_module,
BaseTestCase, ThreadPoolMixin,
ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)


def mul(x, y):
return x * y

def sleep_and_raise(t):
time.sleep(t)
raise Exception('this is an exception')


class WaitTests:
def test_20369(self):
# See https://bugs.python.org/issue20369
future = self.executor.submit(time.sleep, 1.5)
done, not_done = futures.wait([future, future],
return_when=futures.ALL_COMPLETED)
self.assertEqual({future}, done)
self.assertEqual(set(), not_done)


def test_first_completed(self):
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(time.sleep, 1.5)

done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)

self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)

def test_first_completed_some_already_completed(self):
future1 = self.executor.submit(time.sleep, 1.5)

finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)

self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)

@support.requires_resource('walltime')
def test_first_exception(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(sleep_and_raise, 1.5)
future3 = self.executor.submit(time.sleep, 3)

finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)

self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)

def test_first_exception_some_already_complete(self):
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(time.sleep, 1.5)

finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)

self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)

def test_first_exception_one_already_failed(self):
future1 = self.executor.submit(time.sleep, 2)

finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)

self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)

def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)

finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)

self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)

@support.requires_resource('walltime')
def test_timeout(self):
future1 = self.executor.submit(mul, 6, 7)
future2 = self.executor.submit(time.sleep, 6)

finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=5,
return_when=futures.ALL_COMPLETED)

self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEqual(set([future2]), pending)


class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):

def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getswitchinterval()
sys.setswitchinterval(1e-6)
try:
fs = {self.executor.submit(future_func) for i in range(100)}
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setswitchinterval(oldswitchinterval)


create_executor_tests(globals(), WaitTests,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))


def setUpModule():
setup_module()


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions Lib/test/test_eintr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class EINTRTests(unittest.TestCase):

@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
@support.requires_resource('walltime')
def test_all(self):
# Run the tester in a sub-process, to make sure there is only one
# thread (for reliable signal delivery).
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_faulthandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ def test_dump_traceback_later_fd(self):
with tempfile.TemporaryFile('wb+') as fp:
self.check_dump_traceback_later(fd=fp.fileno())

@support.requires_resource('walltime')
def test_dump_traceback_later_twice(self):
self.check_dump_traceback_later(loops=2)

Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_httplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,7 @@ def test_networked_good_cert(self):
h.close()
self.assertIn('nginx', server_string)

@support.requires_resource('walltime')
def test_networked_bad_cert(self):
# We feed a "CA" cert that is unrelated to the server's cert
import ssl
Expand Down
5 changes: 4 additions & 1 deletion Lib/test/test_imaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import socket

from test.support import (verbose,
run_with_tz, run_with_locale, cpython_only,
run_with_tz, run_with_locale, cpython_only, requires_resource,
requires_working_socket)
from test.support import hashlib_helper
from test.support import threading_helper
Expand Down Expand Up @@ -459,6 +459,7 @@ def test_simple_with_statement(self):
with self.imap_class(*server.server_address):
pass

@requires_resource('walltime')
def test_imaplib_timeout_test(self):
_, server = self._setup(SimpleIMAPHandler)
addr = server.server_address[1]
Expand Down Expand Up @@ -552,6 +553,7 @@ class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase):
imap_class = IMAP4_SSL
server_class = SecureTCPServer

@requires_resource('walltime')
def test_ssl_raises(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
Expand All @@ -566,6 +568,7 @@ def test_ssl_raises(self):
ssl_context=ssl_context)
client.shutdown()

@requires_resource('walltime')
def test_ssl_verified(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(CAFILE)
Expand Down
Loading