Skip to content

Commit 7bf5a72

Browse files
hkratztaiki-e
authored andcommitted
Fix issues with AsyncBufRead::read_line and AsyncBufReadExt::lines (#2884)
Fixes the following issues in `AsyncBufRead::read_line`: * When the future is dropped the previous string contents are not restored so the string is empty. * If invalid UTF-8 is encountered the previous string contents are not restored. * If an IO error occurs after `read_until_internal` already read a couple of bytes a debug assertion fails. * Performance: The whole string to which read contents are appended is check for UTF-8 validity instead of just the added bytes. Fixes the following issues in `AsyncBufRead::read_line`: * If an IO error occurs after `read_until_internal` already read a couple of bytes a debug assertion fails. (#2862) Fixes #2862
1 parent 87afaf3 commit 7bf5a72

File tree

5 files changed

+102
-18
lines changed

5 files changed

+102
-18
lines changed

futures-util/src/io/lines.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ impl<R: AsyncBufRead> Stream for Lines<R> {
3535
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3636
let this = self.project();
3737
let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?;
38+
*this.read = 0;
3839
if n == 0 && this.buf.is_empty() {
3940
return Poll::Ready(None);
4041
}

futures-util/src/io/read_line.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ pub struct ReadLine<'a, R: ?Sized> {
1818
buf: &'a mut String,
1919
bytes: Vec<u8>,
2020
read: usize,
21+
finished: bool,
2122
}
2223

2324
impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
2425

2526
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
2627
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
27-
Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 }
28+
Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0, finished: false }
2829
}
2930
}
3031

@@ -35,26 +36,42 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
3536
bytes: &mut Vec<u8>,
3637
read: &mut usize,
3738
) -> Poll<io::Result<usize>> {
38-
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
39-
if str::from_utf8(bytes).is_err() {
40-
bytes.clear();
41-
Poll::Ready(ret.and_then(|_| {
42-
Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
43-
}))
44-
} else {
45-
debug_assert!(buf.is_empty());
46-
debug_assert_eq!(*read, 0);
47-
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
48-
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
49-
Poll::Ready(ret)
39+
let mut ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
40+
if str::from_utf8(&bytes[bytes.len() - *read..bytes.len()]).is_err() {
41+
bytes.truncate(bytes.len() - *read);
42+
if ret.is_ok() {
43+
ret = Err(io::Error::new(
44+
io::ErrorKind::InvalidData,
45+
"stream did not contain valid UTF-8",
46+
));
47+
}
5048
}
49+
*read = 0;
50+
// Safety: `bytes` is valid UTF-8 because it was taken from a String
51+
// and the newly read bytes are either valid UTF-8 or have been removed.
52+
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
53+
Poll::Ready(ret)
5154
}
5255

5356
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
5457
type Output = io::Result<usize>;
5558

5659
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57-
let Self { reader, buf, bytes, read } = &mut *self;
58-
read_line_internal(Pin::new(reader), cx, buf, bytes, read)
60+
let Self { reader, buf, bytes, read, finished: _ } = &mut *self;
61+
let ret = ready!(read_line_internal(Pin::new(reader), cx, buf, bytes, read));
62+
self.finished = true;
63+
Poll::Ready(ret)
64+
}
65+
}
66+
67+
impl<R: ?Sized> Drop for ReadLine<'_, R> {
68+
fn drop(&mut self) {
69+
// restore old string contents
70+
if !self.finished {
71+
self.bytes.truncate(self.bytes.len() - self.read);
72+
// Safety: `bytes` is valid UTF-8 because it was taken from a String
73+
// and the newly read bytes have been removed.
74+
mem::swap(unsafe { self.buf.as_mut_vec() }, &mut self.bytes);
75+
}
5976
}
6077
}

futures-util/src/io/read_until.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use futures_core::ready;
33
use futures_core::task::{Context, Poll};
44
use futures_io::AsyncBufRead;
55
use std::io;
6-
use std::mem;
76
use std::pin::Pin;
87
use std::vec::Vec;
98

@@ -46,7 +45,7 @@ pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
4645
reader.as_mut().consume(used);
4746
*read += used;
4847
if done || used == 0 {
49-
return Poll::Ready(Ok(mem::replace(read, 0)));
48+
return Poll::Ready(Ok(*read));
5049
}
5150
}
5251
}

futures/tests/io_lines.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::executor::block_on;
22
use futures::future::{Future, FutureExt};
3-
use futures::io::{AsyncBufReadExt, Cursor};
3+
use futures::io::{AsyncBufReadExt, AsyncRead, Cursor};
44
use futures::stream::{self, StreamExt, TryStreamExt};
55
use futures::task::Poll;
66
use futures_test::io::AsyncReadTestExt;
@@ -27,6 +27,24 @@ macro_rules! run_next {
2727
};
2828
}
2929

30+
struct IOErrorRead(bool);
31+
32+
impl AsyncRead for IOErrorRead {
33+
fn poll_read(
34+
mut self: std::pin::Pin<&mut Self>,
35+
_cx: &mut std::task::Context<'_>,
36+
b: &mut [u8],
37+
) -> Poll<std::io::Result<usize>> {
38+
if self.0 {
39+
Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into()))
40+
} else {
41+
self.0 = true;
42+
b[..16].fill(b'x');
43+
Ok(16).into()
44+
}
45+
}
46+
}
47+
3048
#[test]
3149
fn lines() {
3250
let buf = Cursor::new(&b"12\r"[..]);
@@ -58,3 +76,9 @@ fn maybe_pending() {
5876
assert_eq!(run_next!(s), "".to_string());
5977
assert!(run(s.next()).is_none());
6078
}
79+
80+
#[test]
81+
fn issue2862() {
82+
let mut lines = futures::io::BufReader::new(IOErrorRead(false)).lines();
83+
assert!(block_on(lines.next()).unwrap().is_err())
84+
}

futures/tests/io_read_line.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use futures::future::{Future, FutureExt};
33
use futures::io::{AsyncBufReadExt, Cursor};
44
use futures::stream::{self, StreamExt, TryStreamExt};
55
use futures::task::Poll;
6+
use futures::AsyncRead;
67
use futures_test::io::AsyncReadTestExt;
78
use futures_test::task::noop_context;
89

@@ -15,6 +16,24 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
1516
}
1617
}
1718

19+
struct IOErrorRead(bool);
20+
21+
impl AsyncRead for IOErrorRead {
22+
fn poll_read(
23+
mut self: std::pin::Pin<&mut Self>,
24+
_cx: &mut std::task::Context<'_>,
25+
b: &mut [u8],
26+
) -> Poll<std::io::Result<usize>> {
27+
if self.0 {
28+
Poll::Ready(Err(std::io::ErrorKind::InvalidInput.into()))
29+
} else {
30+
self.0 = true;
31+
b[..16].fill(b'x');
32+
Ok(16).into()
33+
}
34+
}
35+
}
36+
1837
#[test]
1938
fn read_line() {
2039
let mut buf = Cursor::new(b"12");
@@ -34,6 +53,30 @@ fn read_line() {
3453
assert_eq!(v, "");
3554
}
3655

56+
#[test]
57+
fn read_line_drop() {
58+
// string contents should be preserved if the future is dropped
59+
let mut buf = Cursor::new(b"12\n\n");
60+
let mut v = String::from("abc");
61+
drop(buf.read_line(&mut v));
62+
assert_eq!(v, "abc");
63+
}
64+
65+
#[test]
66+
fn read_line_io_error() {
67+
let mut r = futures::io::BufReader::new(IOErrorRead(false));
68+
let _ = block_on(r.read_line(&mut String::new()));
69+
}
70+
71+
#[test]
72+
fn read_line_utf8_error() {
73+
let mut buf = Cursor::new(b"12\xFF\n\n");
74+
let mut v = String::from("abc");
75+
let res = block_on(buf.read_line(&mut v));
76+
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidData);
77+
assert_eq!(v, "abc");
78+
}
79+
3780
#[test]
3881
fn maybe_pending() {
3982
let mut buf = b"12".interleave_pending();

0 commit comments

Comments
 (0)