|
1 | 1 | from test import support
|
2 | 2 | from test.support import import_helper
|
3 | 3 | from test.support import threading_helper
|
| 4 | +from test.test_unittest.support import BufferedWriter |
4 | 5 |
|
5 | 6 | # Skip tests if _multiprocessing wasn't built.
|
6 | 7 | import_helper.import_module('_multiprocessing')
|
@@ -285,11 +286,33 @@ def _assert_logged(self, msg):
|
285 | 286 | self.assertTrue(any(msg in line for line in output),
|
286 | 287 | output)
|
287 | 288 |
|
| 289 | + # def tearDown(self): |
| 290 | + # super().tearDown() |
| 291 | + # breakpoint() |
| 292 | + # text = self.stream.getvalue() |
| 293 | + # self.assertNotIn("leaked semaphore objects", text) |
| 294 | + |
288 | 295 |
|
289 | 296 | create_executor_tests(InitializerMixin)
|
290 | 297 | create_executor_tests(FailingInitializerMixin)
|
291 | 298 |
|
292 | 299 |
|
| 300 | +class FailingInitializerTextTest(unittest.TestCase): |
| 301 | + def test_spawn(self): |
| 302 | + stream = BufferedWriter() |
| 303 | + runner = unittest.TextTestRunner(stream=stream) |
| 304 | + result = runner.run(ProcessPoolSpawnFailingInitializerTest('test_initializer')) |
| 305 | + text = stream.getvalue() |
| 306 | + self.assertNotIn("leaked semaphore objects", text) |
| 307 | + |
| 308 | + def test_forkserver(self): |
| 309 | + stream = BufferedWriter() |
| 310 | + runner = unittest.TextTestRunner(stream=stream) |
| 311 | + result = runner.run(ProcessPoolForkserverFailingInitializerTest('test_initializer')) |
| 312 | + text = stream.getvalue() |
| 313 | + self.assertNotIn("leaked semaphore objects", text) |
| 314 | + |
| 315 | + |
293 | 316 | class ExecutorShutdownTest:
|
294 | 317 | def test_run_after_shutdown(self):
|
295 | 318 | self.executor.shutdown()
|
|
0 commit comments