Skip to content

Commit 8b7d801

Browse files
committed
Reduce transport_serial.
1 parent f06718d commit 8b7d801

File tree

1 file changed

+54
-91
lines changed

1 file changed

+54
-91
lines changed

pymodbus/transport/transport_serial.py

Lines changed: 54 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,49 @@ def __init__(self, loop, protocol, *args, **kwargs):
1818
self.async_loop = loop
1919
self._protocol: asyncio.BaseProtocol = protocol
2020
self.sync_serial = serial.serial_for_url(*args, **kwargs)
21-
self._closing = False
2221
self._write_buffer = []
23-
self.set_write_buffer_limits()
2422
self._has_reader = False
2523
self._has_writer = False
2624
self._poll_wait_time = 0.0005
27-
28-
# Asynchronous I/O requires non-blocking devices
2925
self.sync_serial.timeout = 0
3026
self.sync_serial.write_timeout = 0
31-
loop.call_soon(protocol.connection_made, self)
32-
loop.call_soon(self._ensure_reader)
27+
28+
def setup(self):
29+
"""Prepare to read/write"""
30+
self.async_loop.call_soon(self._protocol.connection_made, self)
31+
if os.name == "nt":
32+
self._has_reader = self.async_loop.call_later(
33+
self._poll_wait_time, self._poll_read
34+
)
35+
else:
36+
self.async_loop.add_reader(self.sync_serial.fileno(), self._read_ready)
37+
self._has_reader = True
38+
39+
def close(self, exc=None):
40+
"""Close the transport gracefully."""
41+
if self._has_reader:
42+
if os.name == "nt":
43+
self._has_reader.cancel()
44+
else:
45+
self.async_loop.remove_reader(self.sync_serial.fileno())
46+
self._has_reader = False
47+
self._remove_writer()
48+
self.async_loop.call_soon(self._call_connection_lost, exc)
49+
50+
def write(self, data):
51+
"""Write some data to the transport."""
52+
self._write_buffer.append(data)
53+
if not self._has_writer:
54+
if os.name == "nt":
55+
self._has_writer = self.async_loop.call_soon(self._poll_write)
56+
else:
57+
self.async_loop.add_writer(self.sync_serial.fileno(), self._write_ready)
58+
self._has_writer = True
59+
60+
def flush(self):
61+
"""Clear output buffer and stops any more data being written"""
62+
self._remove_writer()
63+
self._write_buffer.clear()
3364

3465
# ------------------------------------------------
3566
# Dummy methods needed to please asyncio.Transport.
@@ -75,80 +106,44 @@ def pause_reading(self):
75106
def resume_reading(self):
76107
"""Resume receiver."""
77108

78-
# ------------------------------------------------
79-
80109
def is_closing(self):
81110
"""Return True if the transport is closing or closed."""
82-
return self._closing
111+
return False
83112

84-
def close(self):
85-
"""Close the transport gracefully."""
86-
if self._closing:
87-
return
88-
self._closing = True
89-
self._remove_reader()
90-
self._remove_writer()
91-
self.async_loop.call_soon(self._call_connection_lost, None)
113+
def abort(self):
114+
"""Close the transport immediately."""
115+
self.close()
116+
117+
# ------------------------------------------------
92118

93119
def _read_ready(self):
94120
"""Test if there are data waiting."""
95121
try:
96122
data = self.sync_serial.read(1024)
97123
except serial.SerialException as exc:
98-
self.async_loop.call_soon(self._call_connection_lost, exc)
99-
self.close()
124+
self.close(exc)
100125
else:
101126
if data:
102127
self._protocol.data_received(data)
103128

104-
def write(self, data):
105-
"""Write some data to the transport."""
106-
if self._closing:
107-
return
108-
109-
self._write_buffer.append(data)
110-
self._ensure_writer()
111-
112-
def abort(self):
113-
"""Close the transport immediately."""
114-
self.close()
115-
116-
def flush(self):
117-
"""Clear output buffer and stops any more data being written"""
118-
self._remove_writer()
119-
self._write_buffer.clear()
120-
121129
def _write_ready(self):
122130
"""Asynchronously write buffered data."""
123131
data = b"".join(self._write_buffer)
124-
assert data, "Write buffer should not be empty"
125-
126-
self._write_buffer.clear()
127-
128132
try:
129-
nlen = self.sync_serial.write(data)
133+
if nlen := self.sync_serial.write(data) < len(data):
134+
self._write_buffer = data[nlen:]
135+
return
136+
self._write_buffer.clear()
137+
self._remove_writer()
130138
except (BlockingIOError, InterruptedError):
131-
self._write_buffer.append(data)
139+
return
132140
except serial.SerialException as exc:
133-
self.async_loop.call_soon(self._call_connection_lost, exc)
134-
self.abort()
135-
else:
136-
if nlen == len(data):
137-
assert not self.get_write_buffer_size()
138-
self._remove_writer()
139-
if self._closing and not self.get_write_buffer_size():
140-
self.close()
141-
return
142-
143-
assert 0 <= nlen < len(data)
144-
data = data[nlen:]
145-
self._write_buffer.append(data) # Try again later
146-
assert self._has_writer
141+
self.close(exc)
147142

148143
if os.name == "nt":
149144

150145
def _poll_read(self):
151-
if self._has_reader and not self._closing:
146+
if self._has_reader:
152147
try:
153148
self._has_reader = self.async_loop.call_later(
154149
self._poll_wait_time, self._poll_read
@@ -159,60 +154,27 @@ def _poll_read(self):
159154
self.async_loop.call_soon(self._call_connection_lost, exc)
160155
self.abort()
161156

162-
def _ensure_reader(self):
163-
if not self._has_reader and not self._closing:
164-
self._has_reader = self.async_loop.call_later(
165-
self._poll_wait_time, self._poll_read
166-
)
167-
168-
def _remove_reader(self):
169-
if self._has_reader:
170-
self._has_reader.cancel()
171-
self._has_reader = False
172-
173157
def _poll_write(self):
174-
if self._has_writer and not self._closing:
158+
if self._has_writer:
175159
self._has_writer = self.async_loop.call_later(
176160
self._poll_wait_time, self._poll_write
177161
)
178162
self._write_ready()
179163

180-
def _ensure_writer(self):
181-
if not self._has_writer and not self._closing:
182-
self._has_writer = self.async_loop.call_soon(self._poll_write)
183-
184164
def _remove_writer(self):
185165
if self._has_writer:
186166
self._has_writer.cancel()
187167
self._has_writer = False
188168

189169
else:
190170

191-
def _ensure_reader(self):
192-
if (not self._has_reader) and (not self._closing):
193-
self.async_loop.add_reader(self.sync_serial.fileno(), self._read_ready)
194-
self._has_reader = True
195-
196-
def _remove_reader(self):
197-
if self._has_reader:
198-
self.async_loop.remove_reader(self.sync_serial.fileno())
199-
self._has_reader = False
200-
201-
def _ensure_writer(self):
202-
if (not self._has_writer) and (not self._closing):
203-
self.async_loop.add_writer(self.sync_serial.fileno(), self._write_ready)
204-
self._has_writer = True
205-
206171
def _remove_writer(self):
207172
if self._has_writer:
208173
self.async_loop.remove_writer(self.sync_serial.fileno())
209174
self._has_writer = False
210175

211176
def _call_connection_lost(self, exc):
212177
"""Close the connection."""
213-
assert self._closing
214-
assert not self._has_writer
215-
assert not self._has_reader
216178
if self.sync_serial:
217179
with contextlib.suppress(Exception):
218180
self.sync_serial.flush()
@@ -231,4 +193,5 @@ async def create_serial_connection(loop, protocol_factory, *args, **kwargs):
231193
"""Create a connection to a new serial port instance."""
232194
protocol = protocol_factory()
233195
transport = SerialTransport(loop, protocol, *args, **kwargs)
196+
loop.call_soon(transport.setup)
234197
return transport, protocol

0 commit comments

Comments
 (0)