Skip to content

Fix massive performance issue in read_to_end #23820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 43 additions & 28 deletions src/libstd/io/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use io::prelude::*;
use cmp;
use error::{self, FromError};
use fmt;
use io::{self, Cursor, DEFAULT_BUF_SIZE, Error, ErrorKind};
use io::{self, DEFAULT_BUF_SIZE, Error, ErrorKind};
use ptr;
use iter;

/// Wraps a `Read` and buffers input from it
///
Expand All @@ -30,7 +31,9 @@ use ptr;
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BufReader<R> {
inner: R,
buf: Cursor<Vec<u8>>,
buf: Vec<u8>,
pos: usize,
cap: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally tried to keep this to a Cursor<Vec<u8>> because it just felt like it was duplicating the functionality otherwise, but I think it would otherwise require unsafe calls to set_len which is somewhat worrisome.

I think that the buf could be represented as Box<[u8]> as well (as the capacity isn't necessary, just the length), but I also think it's fine for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I tried to think of ways to keep it as a Cursor but nothing really seemed right, especially since Cursor<Vec<u8>> extends the vec on its own.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm fine by this :)

}

impl<R: Read> BufReader<R> {
Expand All @@ -43,9 +46,13 @@ impl<R: Read> BufReader<R> {
/// Creates a new `BufReader` with the specified buffer capacity
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(cap: usize, inner: R) -> BufReader<R> {
let mut buf = Vec::with_capacity(cap);
buf.extend(iter::repeat(0).take(cap));
BufReader {
inner: inner,
buf: Cursor::new(Vec::with_capacity(cap)),
buf: buf,
pos: 0,
cap: 0,
}
}

Expand Down Expand Up @@ -74,12 +81,15 @@ impl<R: Read> Read for BufReader<R> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.buf.get_ref().len() == self.buf.position() as usize &&
buf.len() >= self.buf.get_ref().capacity() {
if self.pos == self.cap && buf.len() >= self.buf.len() {
return self.inner.read(buf);
}
try!(self.fill_buf());
self.buf.read(buf)
let nread = {
let mut rem = try!(self.fill_buf());
try!(rem.read(buf))
};
self.consume(nread);
Ok(nread)
}
}

Expand All @@ -88,26 +98,25 @@ impl<R: Read> BufRead for BufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
if self.buf.position() as usize == self.buf.get_ref().len() {
self.buf.set_position(0);
let v = self.buf.get_mut();
v.truncate(0);
let inner = &mut self.inner;
try!(super::with_end_to_cap(v, |b| inner.read(b)));
if self.pos == self.cap {
self.cap = try!(self.inner.read(&mut self.buf));
self.pos = 0;
}
self.buf.fill_buf()
Ok(&self.buf[self.pos..self.cap])
}

fn consume(&mut self, amt: usize) {
self.buf.consume(amt)
self.pos = cmp::min(self.pos + amt, self.cap);
}
}

#[stable(feature = "rust1", since = "1.0.0")]
impl<R> fmt::Debug for BufReader<R> where R: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufReader {{ reader: {:?}, buffer: {}/{} }}",
self.inner, self.buf.position(), self.buf.get_ref().len())
fmt.debug_struct("BufReader")
.field("reader", &self.inner)
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
.finish()
}
}

Expand Down Expand Up @@ -222,8 +231,10 @@ impl<W: Write> Write for BufWriter<W> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for BufWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.as_ref().unwrap(), self.buf.len(), self.buf.capacity())
fmt.debug_struct("BufWriter")
.field("writer", &self.inner.as_ref().unwrap())
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
.finish()
}
}

Expand Down Expand Up @@ -337,9 +348,11 @@ impl<W: Write> Write for LineWriter<W> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for LineWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "LineWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.inner, self.inner.buf.len(),
self.inner.buf.capacity())
fmt.debug_struct("LineWriter")
.field("writer", &self.inner.inner)
.field("buffer",
&format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()))
.finish()
}
}

Expand Down Expand Up @@ -415,10 +428,10 @@ impl<S: Read + Write> BufStream<S> {
/// Any leftover data in the read buffer is lost.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn into_inner(self) -> Result<S, IntoInnerError<BufStream<S>>> {
let BufReader { inner: InternalBufWriter(w), buf } = self.inner;
let BufReader { inner: InternalBufWriter(w), buf, pos, cap } = self.inner;
w.into_inner().map_err(|IntoInnerError(w, e)| {
IntoInnerError(BufStream {
inner: BufReader { inner: InternalBufWriter(w), buf: buf },
inner: BufReader { inner: InternalBufWriter(w), buf: buf, pos: pos, cap: cap },
}, e)
})
}
Expand Down Expand Up @@ -452,10 +465,12 @@ impl<S: Write> fmt::Debug for BufStream<S> where S: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let reader = &self.inner;
let writer = &self.inner.inner.0;
write!(fmt, "BufStream {{ stream: {:?}, write_buffer: {}/{}, read_buffer: {}/{} }}",
writer.inner,
writer.buf.len(), writer.buf.capacity(),
reader.buf.position(), reader.buf.get_ref().len())
fmt.debug_struct("BufStream")
.field("stream", &writer.inner)
.field("write_buffer", &format_args!("{}/{}", writer.buf.len(), writer.buf.capacity()))
.field("read_buffer",
&format_args!("{}/{}", reader.cap - reader.pos, reader.buf.len()))
.finish()
}
}

Expand Down
64 changes: 33 additions & 31 deletions src/libstd/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,6 @@ mod stdio;

const DEFAULT_BUF_SIZE: usize = 64 * 1024;

// Acquires a slice of the vector `v` from its length to its capacity
// (after initializing the data), reads into it, and then updates the length.
//
// This function is leveraged to efficiently read some bytes into a destination
// vector without extra copying and taking advantage of the space that's already
// in `v`.
fn with_end_to_cap<F>(v: &mut Vec<u8>, f: F) -> Result<usize>
where F: FnOnce(&mut [u8]) -> Result<usize>
{
let len = v.len();
let new_area = v.capacity() - len;
v.extend(iter::repeat(0).take(new_area));
match f(&mut v[len..]) {
Ok(n) => {
v.truncate(len + n);
Ok(n)
}
Err(e) => {
v.truncate(len);
Err(e)
}
}
}

// A few methods below (read_to_string, read_line) will append data into a
// `String` buffer, but we need to be pretty careful when doing this. The
// implementation will just call `.as_mut_vec()` and then delegate to a
Expand Down Expand Up @@ -116,19 +92,45 @@ fn append_to_string<F>(buf: &mut String, f: F) -> Result<usize>
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
fn read_to_end<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
let mut read = 0;
let start_len = buf.len();
let mut len = start_len;
let mut cap_bump = 16;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this should be buf.capacity() rather than a constant. Otherwise, if the vector is large, you are going to be reallocating it a lot unnecessarily once the read reaches its capacity.
I think the capacity increase behavior in this function should be identical to what would happen if we used buf.push() on every byte in r (ie it simply doubles the buf's capacity every time it is filled to capacity). This is probably what the user will be expecting, and allows them to be more flexible by reading in data of varying sizes from multiple sources and having it be efficient in every case.

let ret;
loop {
if buf.capacity() == buf.len() {
buf.reserve(DEFAULT_BUF_SIZE);
if len == buf.len() {
if buf.capacity() == buf.len() {
if cap_bump < DEFAULT_BUF_SIZE {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have a maximum capacity increase at all? This makes the function O(number of bytes squared) instead of O (number of bytes). As the memory needed to be allocated gets larger than 64 kb this is going to be doing a lot of unnecessary reallocs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All reserve does is ensure that the Vec has at least that much space. It's free to reallocate with whatever size it feels is best internally: http://doc.rust-lang.org/collections/vec/struct.Vec.html#method.reserve

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a cap because the memset is relatively expensive. We expect about half of the last vec extension to be unused after the last part of the stream is read. As is, that's 32k of wasted work, but if there was no cap it could be 10s or 100s of megabytes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the implementation of reserve() I think there's two separate issues.

  1. It looks like reserve() actually always rounds up to the nearest power of two. Because of that, the algorithm is basically equivalent to my suggestion of always doubling the capacity of the vector, and so is O(number of bytes). However this is not guaranteed by the reserve() API and thus a change to reserve, holding the API consistent, could make this algorithm O(n^2) (or maybe worse).

-if the exact behavior of reserve is meant to be part of the API, and will be included in the API, then this algorithm as written is more complex than necessary, and can be made equivalent by always reserving 16 when the capacity equals the length.

-However, I expect this behavior is not meant to be a guarantee of the API. Therefore, I think this should probably be coded here directly with reserve_exact to make sure the algorithm is always O(number of bytes).

As is, that's 32k of wasted work, but if there was no cap it could be 10s or 100s of megabytes

Not so. Let's pretend we start with a vector that has 32k capacity and size, and need to read in 128 megabytes + 1 byte (so we are doing the maximum wasted work for the doubling algorithm). Let's assume that copying (ie a realloc) and memsetting are equally expensive.
For the doubling algorithm, we need to copy and memset on (64 k + 128k + 256k + 512k + ... + ... + 64 MB + 128 MB + 256 MB) = 256(1 + 1/2 + 1/4 + ...) MB ~= 512 MB.

For the increase by 32k algorithm, we need to copy and memset on (64k + 96k + 128k + 160k + ... + (128 MB - 32k) + (128 MB) + (128 MB + 32k) = 64(1 + 2 + 3 + ... + 2000 + 2001) kilobytes = 64 * (2001 * 2002) / 2 kilobytes ~= 128e6 kilobytes = 128 gigabytes(!)

Being O(n^2), this behavior gets even worse for larger amounts of data, and 128 MB is not at all of an unusual amount of data to be working with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I fully understand what you mean. I'm not able to reproduce the numbers I would expect to see if there was indeed a 250x difference in memory writes: https://gist.github.com/sfackler/81f8a7ac614e19f96cc8

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See point 1. Due to the not-necessarily apparent and undocumented way in which reserve() decides how to actually reserve memory, the current implementation of reserve() and read_to_end() just happen to sync in such a way that the behavior is equivalent to doubling the size of the vector at each step. If you replace reserve() here with reserve_exact() you should see the behavior I am talking about.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see it with this example:

use std::io::Read;
fn main() {
    let buf = [1u8; 1024 * 1024 + 2];
    let mut src = &buf[..];
    let mut dst = Vec::with_capacity(32);
    src.read_to_end(&mut dst);
    println!("{}",dst.capacity());
}

If it were allocating 64k at a time, it would print 1024 * 1024 + 64 * 1024 = 114112. Instead it prints 1024 * 1024 * 2 = 2097152.

This is why I think the doubling behavior should be made explicit with reserve_exact(), this is not at all obvious behavior and not mentioned or guaranteed by the API.

cap_bump *= 2;
}
buf.reserve(cap_bump);
}
let new_area = buf.capacity() - buf.len();
buf.extend(iter::repeat(0).take(new_area));
}
match with_end_to_cap(buf, |b| r.read(b)) {
Ok(0) => return Ok(read),
Ok(n) => read += n,

match r.read(&mut buf[len..]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize now that if this panics the length of buf is probably "wrong", but I also think that's ok because with a &mut Vec<u8> we're guaranteed no one else is reading it unless they're possibly in another thread, so it's not really the end of the world.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that with_end_to_cap also suffered this problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about having a guard fix the length but it seems not worth it. Worst case you see some extra 0 bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

Ok(0) => {
ret = Ok(len - start_len);
break;
}
Ok(n) => len += n,
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => {
ret = Err(e);
break;
}
}
}

buf.truncate(len);
ret
}

/// A trait for objects which are byte-oriented sources.
Expand Down
1 change: 1 addition & 0 deletions src/libstd/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
#![feature(into_cow)]
#![feature(slice_patterns)]
#![feature(std_misc)]
#![feature(debug_builders)]
#![cfg_attr(test, feature(test, rustc_private, std_misc))]

// Don't link to std. We are std.
Expand Down