Skip to content

Commit 79790bc

Browse files
authored
bpo-33694: Fix race condition in asyncio proactor (GH-7498)
The cancellation of an overlapped WSARecv() has a race condition which causes data loss because of the current implementation of proactor in asyncio. No longer cancel overlapped WSARecv() in _ProactorReadPipeTransport to work around the race condition. Remove the optimized recv_into() implementation to get simple implementation of pause_reading() using the single _pending_data attribute. Move _feed_data_to_bufferred_proto() to protocols.py. Remove set_protocol() method which became useless.
1 parent d3ed67d commit 79790bc

File tree

6 files changed

+86
-151
lines changed

6 files changed

+86
-151
lines changed

Lib/asyncio/proactor_events.py

Lines changed: 53 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -159,27 +159,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
159159

160160
def __init__(self, loop, sock, protocol, waiter=None,
161161
extra=None, server=None):
162-
self._loop_reading_cb = None
162+
self._pending_data = None
163163
self._paused = True
164164
super().__init__(loop, sock, protocol, waiter, extra, server)
165165

166-
self._reschedule_on_resume = False
167166
self._loop.call_soon(self._loop_reading)
168167
self._paused = False
169168

170-
def set_protocol(self, protocol):
171-
if isinstance(protocol, protocols.BufferedProtocol):
172-
self._loop_reading_cb = self._loop_reading__get_buffer
173-
else:
174-
self._loop_reading_cb = self._loop_reading__data_received
175-
176-
super().set_protocol(protocol)
177-
178-
if self.is_reading():
179-
# reset reading callback / buffers / self._read_fut
180-
self.pause_reading()
181-
self.resume_reading()
182-
183169
def is_reading(self):
184170
return not self._paused and not self._closing
185171

@@ -188,32 +174,39 @@ def pause_reading(self):
188174
return
189175
self._paused = True
190176

191-
if self._read_fut is not None and not self._read_fut.done():
192-
# TODO: This is an ugly hack to cancel the current read future
193-
# *and* avoid potential race conditions, as read cancellation
194-
# goes through `future.cancel()` and `loop.call_soon()`.
195-
# We then use this special attribute in the reader callback to
196-
# exit *immediately* without doing any cleanup/rescheduling.
197-
self._read_fut.__asyncio_cancelled_on_pause__ = True
198-
199-
self._read_fut.cancel()
200-
self._read_fut = None
201-
self._reschedule_on_resume = True
177+
# bpo-33694: Don't cancel self._read_fut because cancelling an
178+
# overlapped WSASend() loss silently data with the current proactor
179+
# implementation.
180+
#
181+
# If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend()
182+
# completed (even if HasOverlappedIoCompleted() returns 0), but
183+
# Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND
184+
# error. Once the overlapped is ignored, the IOCP loop will ignores the
185+
# completion I/O event and so not read the result of the overlapped
186+
# WSARecv().
202187

203188
if self._loop.get_debug():
204189
logger.debug("%r pauses reading", self)
205190

206191
def resume_reading(self):
207192
if self._closing or not self._paused:
208193
return
194+
209195
self._paused = False
210-
if self._reschedule_on_resume:
211-
self._loop.call_soon(self._loop_reading, self._read_fut)
212-
self._reschedule_on_resume = False
196+
if self._read_fut is None:
197+
self._loop.call_soon(self._loop_reading, None)
198+
199+
data = self._pending_data
200+
self._pending_data = None
201+
if data is not None:
202+
# Call the protocol methode after calling _loop_reading(),
203+
# since the protocol can decide to pause reading again.
204+
self._loop.call_soon(self._data_received, data)
205+
213206
if self._loop.get_debug():
214207
logger.debug("%r resumes reading", self)
215208

216-
def _loop_reading__on_eof(self):
209+
def _eof_received(self):
217210
if self._loop.get_debug():
218211
logger.debug("%r received EOF", self)
219212

@@ -227,18 +220,30 @@ def _loop_reading__on_eof(self):
227220
if not keep_open:
228221
self.close()
229222

230-
def _loop_reading(self, fut=None):
231-
self._loop_reading_cb(fut)
232-
233-
def _loop_reading__data_received(self, fut):
234-
if (fut is not None and
235-
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
223+
def _data_received(self, data):
224+
if self._paused:
225+
# Don't call any protocol method while reading is paused.
226+
# The protocol will be called on resume_reading().
227+
assert self._pending_data is None
228+
self._pending_data = data
236229
return
237230

238-
if self._paused:
239-
self._reschedule_on_resume = True
231+
if not data:
232+
self._eof_received()
240233
return
241234

235+
if isinstance(self._protocol, protocols.BufferedProtocol):
236+
try:
237+
protocols._feed_data_to_bufferred_proto(self._protocol, data)
238+
except Exception as exc:
239+
self._fatal_error(exc,
240+
'Fatal error: protocol.buffer_updated() '
241+
'call failed.')
242+
return
243+
else:
244+
self._protocol.data_received(data)
245+
246+
def _loop_reading(self, fut=None):
242247
data = None
243248
try:
244249
if fut is not None:
@@ -261,8 +266,12 @@ def _loop_reading__data_received(self, fut):
261266
# we got end-of-file so no need to reschedule a new read
262267
return
263268

264-
# reschedule a new read
265-
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
269+
# bpo-33694: buffer_updated() has currently no fast path because of
270+
# a data loss issue caused by overlapped WSASend() cancellation.
271+
272+
if not self._paused:
273+
# reschedule a new read
274+
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
266275
except ConnectionAbortedError as exc:
267276
if not self._closing:
268277
self._fatal_error(exc, 'Fatal read error on pipe transport')
@@ -277,92 +286,11 @@ def _loop_reading__data_received(self, fut):
277286
if not self._closing:
278287
raise
279288
else:
280-
self._read_fut.add_done_callback(self._loop_reading__data_received)
289+
if not self._paused:
290+
self._read_fut.add_done_callback(self._loop_reading)
281291
finally:
282-
if data:
283-
self._protocol.data_received(data)
284-
elif data == b'':
285-
self._loop_reading__on_eof()
286-
287-
def _loop_reading__get_buffer(self, fut):
288-
if (fut is not None and
289-
getattr(fut, '__asyncio_cancelled_on_pause__', False)):
290-
return
291-
292-
if self._paused:
293-
self._reschedule_on_resume = True
294-
return
295-
296-
nbytes = None
297-
if fut is not None:
298-
assert self._read_fut is fut or (self._read_fut is None and
299-
self._closing)
300-
self._read_fut = None
301-
try:
302-
if fut.done():
303-
nbytes = fut.result()
304-
else:
305-
# the future will be replaced by next proactor.recv call
306-
fut.cancel()
307-
except ConnectionAbortedError as exc:
308-
if not self._closing:
309-
self._fatal_error(
310-
exc, 'Fatal read error on pipe transport')
311-
elif self._loop.get_debug():
312-
logger.debug("Read error on pipe transport while closing",
313-
exc_info=True)
314-
except ConnectionResetError as exc:
315-
self._force_close(exc)
316-
except OSError as exc:
317-
self._fatal_error(exc, 'Fatal read error on pipe transport')
318-
except futures.CancelledError:
319-
if not self._closing:
320-
raise
321-
322-
if nbytes is not None:
323-
if nbytes == 0:
324-
# we got end-of-file so no need to reschedule a new read
325-
self._loop_reading__on_eof()
326-
else:
327-
try:
328-
self._protocol.buffer_updated(nbytes)
329-
except Exception as exc:
330-
self._fatal_error(
331-
exc,
332-
'Fatal error: '
333-
'protocol.buffer_updated() call failed.')
334-
return
335-
336-
if self._closing or nbytes == 0:
337-
# since close() has been called we ignore any read data
338-
return
339-
340-
try:
341-
buf = self._protocol.get_buffer(-1)
342-
if not len(buf):
343-
raise RuntimeError('get_buffer() returned an empty buffer')
344-
except Exception as exc:
345-
self._fatal_error(
346-
exc, 'Fatal error: protocol.get_buffer() call failed.')
347-
return
348-
349-
try:
350-
# schedule a new read
351-
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
352-
self._read_fut.add_done_callback(self._loop_reading__get_buffer)
353-
except ConnectionAbortedError as exc:
354-
if not self._closing:
355-
self._fatal_error(exc, 'Fatal read error on pipe transport')
356-
elif self._loop.get_debug():
357-
logger.debug("Read error on pipe transport while closing",
358-
exc_info=True)
359-
except ConnectionResetError as exc:
360-
self._force_close(exc)
361-
except OSError as exc:
362-
self._fatal_error(exc, 'Fatal read error on pipe transport')
363-
except futures.CancelledError:
364-
if not self._closing:
365-
raise
292+
if data is not None:
293+
self._data_received(data)
366294

367295

368296
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,

Lib/asyncio/protocols.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,22 @@ def pipe_connection_lost(self, fd, exc):
189189

190190
def process_exited(self):
191191
"""Called when subprocess has exited."""
192+
193+
194+
def _feed_data_to_bufferred_proto(proto, data):
195+
data_len = len(data)
196+
while data_len:
197+
buf = proto.get_buffer(data_len)
198+
buf_len = len(buf)
199+
if not buf_len:
200+
raise RuntimeError('get_buffer() returned an empty buffer')
201+
202+
if buf_len >= data_len:
203+
buf[:data_len] = data
204+
proto.buffer_updated(data_len)
205+
return
206+
else:
207+
buf[:buf_len] = data[:buf_len]
208+
proto.buffer_updated(buf_len)
209+
data = data[buf_len:]
210+
data_len = len(data)

Lib/asyncio/sslproto.py

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ def data_received(self, data):
535535
if chunk:
536536
try:
537537
if self._app_protocol_is_buffer:
538-
_feed_data_to_bufferred_proto(
538+
protocols._feed_data_to_bufferred_proto(
539539
self._app_protocol, chunk)
540540
else:
541541
self._app_protocol.data_received(chunk)
@@ -721,22 +721,3 @@ def _abort(self):
721721
self._transport.abort()
722722
finally:
723723
self._finalize()
724-
725-
726-
def _feed_data_to_bufferred_proto(proto, data):
727-
data_len = len(data)
728-
while data_len:
729-
buf = proto.get_buffer(data_len)
730-
buf_len = len(buf)
731-
if not buf_len:
732-
raise RuntimeError('get_buffer() returned an empty buffer')
733-
734-
if buf_len >= data_len:
735-
buf[:data_len] = data
736-
proto.buffer_updated(data_len)
737-
return
738-
else:
739-
buf[:buf_len] = data[:buf_len]
740-
proto.buffer_updated(buf_len)
741-
data = data[buf_len:]
742-
data_len = len(data)

Lib/test/test_asyncio/test_proactor_events.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,8 @@ def test_dont_pause_writing(self):
459459
self.assertFalse(self.protocol.pause_writing.called)
460460

461461

462+
@unittest.skip('FIXME: bpo-33694: these tests are too close '
463+
'to the implementation and should be refactored or removed')
462464
class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
463465

464466
def setUp(self):
@@ -551,6 +553,8 @@ def test_proto_type_switch(self):
551553
self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
552554
buf_proto.buffer_updated.assert_called_with(4)
553555

556+
@unittest.skip('FIXME: bpo-33694: this test is too close to the '
557+
'implementation and should be refactored or removed')
554558
def test_proto_buf_switch(self):
555559
tr = self.socket_transport()
556560
test_utils.run_briefly(self.loop)

Lib/test/test_asyncio/test_sslproto.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import asyncio
1313
from asyncio import log
14+
from asyncio import protocols
1415
from asyncio import sslproto
1516
from asyncio import tasks
1617
from test.test_asyncio import utils as test_utils
@@ -189,28 +190,28 @@ def buffer_updated(self, nsize):
189190

190191
for usemv in [False, True]:
191192
proto = Proto(1, usemv)
192-
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
193+
protocols._feed_data_to_bufferred_proto(proto, b'12345')
193194
self.assertEqual(proto.data, b'12345')
194195

195196
proto = Proto(2, usemv)
196-
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
197+
protocols._feed_data_to_bufferred_proto(proto, b'12345')
197198
self.assertEqual(proto.data, b'12345')
198199

199200
proto = Proto(2, usemv)
200-
sslproto._feed_data_to_bufferred_proto(proto, b'1234')
201+
protocols._feed_data_to_bufferred_proto(proto, b'1234')
201202
self.assertEqual(proto.data, b'1234')
202203

203204
proto = Proto(4, usemv)
204-
sslproto._feed_data_to_bufferred_proto(proto, b'1234')
205+
protocols._feed_data_to_bufferred_proto(proto, b'1234')
205206
self.assertEqual(proto.data, b'1234')
206207

207208
proto = Proto(100, usemv)
208-
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
209+
protocols._feed_data_to_bufferred_proto(proto, b'12345')
209210
self.assertEqual(proto.data, b'12345')
210211

211212
proto = Proto(0, usemv)
212213
with self.assertRaisesRegex(RuntimeError, 'empty buffer'):
213-
sslproto._feed_data_to_bufferred_proto(proto, b'12345')
214+
protocols._feed_data_to_bufferred_proto(proto, b'12345')
214215

215216
def test_start_tls_client_reg_proto_1(self):
216217
HELLO_MSG = b'1' * self.PAYLOAD_SIZE
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
asyncio: Fix a race condition causing data loss on
2+
pause_reading()/resume_reading() when using the ProactorEventLoop.

0 commit comments

Comments
 (0)