Skip to content

Commit 83fcf1d

Browse files
Merge pull request #5 from michealroberts/feature/handlers/ReadTimeoutHandler
feat: add ReadTimeoutHandler to handlers in samps module
2 parents c4de698 + 4f3850f commit 83fcf1d

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

src/samps/handlers.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# **************************************************************************************
2+
3+
# @package samps
4+
# @license MIT License Copyright (c) 2025 Michael J. Roberts
5+
6+
# **************************************************************************************
7+
8+
9+
from time import monotonic
10+
from typing import Optional
11+
12+
# **************************************************************************************
13+
14+
15+
class ReadTimeoutHandler:
16+
"""
17+
A handler for managing read timeouts in serial communication.
18+
19+
Tracks a timeout period in milliseconds, provides methods to check
20+
whether the timeout has expired, retrieve the remaining time,
21+
start or reset the timer, and a repr string.
22+
"""
23+
24+
def __init__(self, timeout: Optional[float]) -> None:
25+
"""
26+
Initialize with a timeout value in milliseconds.
27+
28+
Args:
29+
timeout: timeout duration in milliseconds, or None to disable.
30+
"""
31+
self._timeout = timeout
32+
33+
def start(self) -> None:
34+
"""
35+
Restart the timeout countdown from the current moment.
36+
"""
37+
self._start = monotonic()
38+
39+
def has_expired(self) -> bool:
40+
"""
41+
Return True if the elapsed time since start() has reached
42+
or exceeded the timeout value.
43+
"""
44+
# If no timeout is set, return None:
45+
if self._timeout is None:
46+
return False
47+
48+
# If the timer hasn't started, we can't calculate remaining time:
49+
if self._start is None:
50+
raise RuntimeError("Timeout not started. Call start() first.")
51+
52+
return ((monotonic() - self._start) * 1000) >= self._timeout
53+
54+
def remaining(self) -> Optional[float]:
55+
"""
56+
Return the number of milliseconds left before expiration,
57+
never negative.
58+
59+
Returns:
60+
The remaining time in milliseconds, or None if timeouts are disabled.
61+
"""
62+
# If no timeout is set, return None:
63+
if self._timeout is None:
64+
return None
65+
66+
# If the timer hasn't started, we can't calculate remaining time:
67+
if self._start is None:
68+
raise RuntimeError("Timeout not started. Call start() first.")
69+
70+
remaining = self._timeout - (monotonic() - self._start) * 1000
71+
return remaining if remaining > 0 else 0.0
72+
73+
def reset(self) -> None:
74+
"""
75+
Alias for start(): restart the timeout countdown.
76+
"""
77+
self.start()
78+
79+
def __repr__(self) -> str:
80+
# If the timeout is None, we can't calculate remaining time:
81+
if self._timeout is None:
82+
return "<ReadTimeoutHandler timeout=None>"
83+
84+
# If the timer hasn't started, we can't calculate remaining time:
85+
if self._start is None:
86+
return f"<ReadTimeoutHandler timeout={self._timeout:.0f}ms, remaining=NotStarted>"
87+
88+
# Otherwise, return back the remaining time:
89+
return (
90+
f"<ReadTimeoutHandler timeout={self._timeout:.0f}ms, "
91+
f"remaining={self.remaining():.0f}ms>"
92+
)
93+
94+
95+
# **************************************************************************************

test/test_handlers.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# **************************************************************************************
2+
3+
# @package samps
4+
# @license MIT License Copyright (c) 2025 Michael J. Roberts
5+
6+
# **************************************************************************************
7+
8+
import unittest
9+
from time import sleep
10+
11+
from samps.handlers import ReadTimeoutHandler
12+
13+
# **************************************************************************************
14+
15+
16+
class TestReadTimeoutHandler(unittest.TestCase):
17+
def test_no_timeout(self):
18+
handler = ReadTimeoutHandler(None)
19+
handler.start()
20+
self.assertFalse(handler.has_expired())
21+
self.assertIsNone(handler.remaining())
22+
self.assertIn("timeout=None", repr(handler))
23+
24+
def test_timeout_not_expired(self):
25+
handler = ReadTimeoutHandler(2.0)
26+
handler.start()
27+
self.assertFalse(handler.has_expired())
28+
remaining = handler.remaining()
29+
self.assertTrue(0.0 < remaining <= 2.0)
30+
31+
def test_timeout_expired(self):
32+
handler = ReadTimeoutHandler(0.5)
33+
handler.start()
34+
sleep(0.6)
35+
self.assertTrue(handler.has_expired())
36+
self.assertEqual(handler.remaining(), 0.0)
37+
38+
def test_reset(self):
39+
handler = ReadTimeoutHandler(0.1)
40+
handler.start()
41+
sleep(0.2)
42+
self.assertTrue(handler.has_expired())
43+
handler.reset()
44+
self.assertFalse(handler.has_expired())
45+
self.assertTrue(0.0 < handler.remaining() <= 0.1)
46+
47+
def test_start(self):
48+
handler = ReadTimeoutHandler(0.1)
49+
handler.start()
50+
self.assertFalse(handler.has_expired())
51+
52+
53+
# **************************************************************************************
54+
55+
if __name__ == "__main__":
56+
unittest.main()
57+
58+
# **************************************************************************************

0 commit comments

Comments
 (0)