Skip to content

Commit b646e43

Browse files
committed
PYTHON-5784 Address Noah's review: move async tests to test/asynchronous/ via synchro
- Create test/asynchronous/test_periodic_executor.py as the single source of truth for all periodic executor tests, using AsyncUnitTest with asyncSetUp/ asyncTearDown base class for executor lifecycle management - Register test_periodic_executor.py in synchro's converted_tests so the sync variant is auto-generated - Replace the manually-maintained test/test_periodic_executor.py with the synchro-generated equivalent, eliminating duplicated async/sync test code - Use _IS_SYNC branching for the small number of tests that differ between threading (PeriodicExecutor) and asyncio (AsyncPeriodicExecutor) behavior
1 parent 613e815 commit b646e43

3 files changed

Lines changed: 457 additions & 303 deletions

File tree

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
# Copyright 2026-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for periodic_executor.py."""
16+
17+
from __future__ import annotations
18+
19+
import asyncio
20+
import gc
21+
import sys
22+
import threading
23+
import time
24+
25+
sys.path[0:0] = [""]
26+
27+
from test.asynchronous import AsyncUnitTest, unittest
28+
29+
import pymongo.periodic_executor as pe_module
30+
from pymongo.periodic_executor import (
31+
AsyncPeriodicExecutor,
32+
_register_executor,
33+
_shutdown_executors,
34+
)
35+
36+
_IS_SYNC = False
37+
38+
39+
def _make_executor(interval=30.0, min_interval=0.01, target=None, name="test"):
40+
if target is None:
41+
42+
async def target():
43+
return True
44+
45+
return AsyncPeriodicExecutor(
46+
interval=interval, min_interval=min_interval, target=target, name=name
47+
)
48+
49+
50+
class _AsyncPeriodicExecutorTestBase(AsyncUnitTest):
51+
async def asyncSetUp(self):
52+
self.ex = _make_executor()
53+
54+
async def asyncTearDown(self):
55+
self.ex.close()
56+
await self.ex.join(timeout=2)
57+
58+
59+
class TestAsyncPeriodicExecutorRepr(AsyncUnitTest):
60+
async def test_repr_contains_class_and_name(self):
61+
ex = _make_executor(name="exec")
62+
r = repr(ex)
63+
self.assertIn("AsyncPeriodicExecutor", r)
64+
self.assertIn("exec", r)
65+
66+
67+
class TestAsyncPeriodicExecutorBasic(_AsyncPeriodicExecutorTestBase):
68+
async def test_wake_sets_event(self):
69+
self.assertFalse(self.ex._event)
70+
self.ex.wake()
71+
self.assertTrue(self.ex._event)
72+
73+
async def test_update_interval(self):
74+
self.ex.update_interval(60)
75+
self.assertEqual(self.ex._interval, 60)
76+
77+
async def test_skip_sleep(self):
78+
self.assertFalse(self.ex._skip_sleep)
79+
self.ex.skip_sleep()
80+
self.assertTrue(self.ex._skip_sleep)
81+
82+
83+
class TestAsyncPeriodicExecutorLifecycle(_AsyncPeriodicExecutorTestBase):
84+
async def test_open_starts_worker(self):
85+
self.ex.open()
86+
if _IS_SYNC:
87+
self.assertIsNotNone(self.ex._thread)
88+
self.assertTrue(self.ex._thread.is_alive())
89+
else:
90+
self.assertIsNotNone(self.ex._task)
91+
92+
async def test_close_sets_stopped(self):
93+
self.ex.open()
94+
self.ex.close()
95+
self.assertTrue(self.ex._stopped)
96+
await self.ex.join(timeout=1)
97+
98+
async def test_join_without_open_is_safe(self):
99+
await self.ex.join(timeout=0.01)
100+
101+
async def test_multiple_open_calls_have_no_effect(self):
102+
self.ex.open()
103+
if _IS_SYNC:
104+
worker_id = id(self.ex._thread)
105+
else:
106+
worker_id = id(self.ex._task)
107+
self.ex.open()
108+
if _IS_SYNC:
109+
self.assertEqual(worker_id, id(self.ex._thread))
110+
else:
111+
self.assertEqual(worker_id, id(self.ex._task))
112+
113+
114+
class TestAsyncPeriodicExecutorTarget(_AsyncPeriodicExecutorTestBase):
115+
async def test_target_returning_false_stops_executor(self):
116+
if _IS_SYNC:
117+
ran = threading.Event()
118+
else:
119+
ran = asyncio.Event()
120+
121+
async def target():
122+
ran.set()
123+
return False
124+
125+
self.ex = _make_executor(target=target)
126+
self.ex.open()
127+
if _IS_SYNC:
128+
self.assertTrue(ran.wait(timeout=2), "target never ran")
129+
else:
130+
await asyncio.wait_for(ran.wait(), timeout=2)
131+
await self.ex.join(timeout=2)
132+
self.assertTrue(self.ex._stopped)
133+
134+
async def test_target_exception_stops_executor(self):
135+
if _IS_SYNC:
136+
ran = threading.Event()
137+
captured_exc: list = []
138+
orig_excepthook = threading.excepthook
139+
140+
def _capture_excepthook(args):
141+
captured_exc.append(args.exc_value)
142+
143+
threading.excepthook = _capture_excepthook
144+
try:
145+
146+
def target():
147+
ran.set()
148+
raise RuntimeError("boom")
149+
150+
self.ex = _make_executor(target=target)
151+
self.ex.open()
152+
self.assertTrue(ran.wait(timeout=2), "target never ran")
153+
self.ex.join(timeout=2)
154+
finally:
155+
threading.excepthook = orig_excepthook
156+
self.assertTrue(self.ex._stopped)
157+
self.assertEqual(len(captured_exc), 1)
158+
self.assertIsInstance(captured_exc[0], RuntimeError)
159+
else:
160+
ran = asyncio.Event()
161+
162+
async def target():
163+
ran.set()
164+
raise RuntimeError("async boom")
165+
166+
self.ex = _make_executor(target=target)
167+
self.ex.open()
168+
await asyncio.wait_for(ran.wait(), timeout=2)
169+
await self.ex.join(timeout=2)
170+
self.assertTrue(self.ex._stopped)
171+
if self.ex._task is not None and self.ex._task.done():
172+
self.ex._task.exception()
173+
174+
async def test_skip_sleep_flag_skips_interval(self):
175+
call_times = []
176+
177+
async def target():
178+
call_times.append(time.monotonic() if _IS_SYNC else asyncio.get_running_loop().time())
179+
if len(call_times) >= 2:
180+
return False
181+
return True
182+
183+
self.ex = _make_executor(interval=30.0, min_interval=0.001, target=target)
184+
self.ex.skip_sleep()
185+
self.ex.open()
186+
await self.ex.join(timeout=3)
187+
self.assertGreaterEqual(len(call_times), 2)
188+
self.assertLess(call_times[1] - call_times[0], 5.0)
189+
190+
async def test_wake_causes_early_run(self):
191+
call_count = [0]
192+
if _IS_SYNC:
193+
woken = threading.Event()
194+
else:
195+
woken = asyncio.Event()
196+
197+
async def target():
198+
call_count[0] += 1
199+
if call_count[0] == 1:
200+
woken.set()
201+
if call_count[0] >= 2:
202+
return False
203+
return True
204+
205+
self.ex = _make_executor(interval=30.0, min_interval=0.01, target=target)
206+
self.ex.open()
207+
if _IS_SYNC:
208+
woken.wait(timeout=2)
209+
else:
210+
await asyncio.wait_for(woken.wait(), timeout=2)
211+
self.ex.wake()
212+
await self.ex.join(timeout=3)
213+
self.assertGreaterEqual(call_count[0], 2)
214+
215+
async def test_open_after_target_returns_false(self):
216+
called = [0]
217+
218+
async def target():
219+
called[0] += 1
220+
return False
221+
222+
self.ex = _make_executor(target=target)
223+
self.ex.open()
224+
await self.ex.join(timeout=2)
225+
self.assertTrue(self.ex._stopped)
226+
if not _IS_SYNC:
227+
first_task = self.ex._task
228+
self.ex.open()
229+
await self.ex.join(timeout=2)
230+
self.assertGreaterEqual(called[0], 2)
231+
if not _IS_SYNC:
232+
self.assertIsNot(self.ex._task, first_task)
233+
234+
235+
class TestShouldStop(AsyncUnitTest):
236+
if _IS_SYNC:
237+
238+
def test_returns_false_when_not_stopped(self):
239+
ex = _make_executor()
240+
self.assertFalse(ex._should_stop())
241+
self.assertFalse(ex._thread_will_exit)
242+
243+
def test_returns_true_and_sets_thread_will_exit(self):
244+
ex = _make_executor()
245+
ex._stopped = True
246+
self.assertTrue(ex._should_stop())
247+
self.assertTrue(ex._thread_will_exit)
248+
249+
250+
class TestRegisterExecutor(AsyncUnitTest):
251+
if _IS_SYNC:
252+
253+
def setUp(self):
254+
self._orig = set(pe_module._EXECUTORS)
255+
256+
def tearDown(self):
257+
pe_module._EXECUTORS.clear()
258+
pe_module._EXECUTORS.update(self._orig)
259+
260+
def test_register_adds_weakref(self):
261+
ex = _make_executor()
262+
before = len(pe_module._EXECUTORS)
263+
_register_executor(ex)
264+
self.assertEqual(len(pe_module._EXECUTORS), before + 1)
265+
ref = next(r for r in pe_module._EXECUTORS if r() is ex)
266+
del ex
267+
gc.collect()
268+
self.assertNotIn(ref, pe_module._EXECUTORS)
269+
270+
def test_shutdown_executors_stops_running_executors(self):
271+
ran = threading.Event()
272+
273+
def target():
274+
ran.set()
275+
return True
276+
277+
ex = _make_executor(target=target)
278+
ex.open()
279+
self.assertTrue(ran.wait(timeout=2), "target never ran")
280+
_shutdown_executors()
281+
ex.join(timeout=2)
282+
self.assertTrue(ex._stopped)
283+
284+
def test_shutdown_executors_safe_when_empty(self):
285+
pe_module._EXECUTORS.clear()
286+
_shutdown_executors()
287+
288+
289+
if __name__ == "__main__":
290+
unittest.main()

0 commit comments

Comments
 (0)