From 6a6a8301fd3cd552a5d867140233181292e742b5 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Thu, 29 Oct 2020 21:08:53 -0400 Subject: [PATCH 01/15] Implementation changes to BufWriter Changes to some BufWriter Write methods, with a focus on reducing total device write calls by fully filling the buffer in some cases --- library/std/src/io/buffered/bufwriter.rs | 101 ++++++++++++++++------- 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 3ec272fea6668..0ca527a31a00a 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -291,15 +291,14 @@ impl BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Write for BufWriter { fn write(&mut self, buf: &[u8]) -> io::Result { + // We assume that callers of `write` prefer to avoid split writes where + // possible, so we pre-flush the buffer rather than doing a partial + // write to fill it. if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if buf.len() >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write(buf); - self.panicked = false; - r + self.get_mut().write(buf) } else { self.buf.extend_from_slice(buf); Ok(buf.len()) @@ -307,39 +306,79 @@ impl Write for BufWriter { } fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - // Normally, `write_all` just calls `write` in a loop. We can do better - // by calling `self.get_mut().write_all()` directly, which avoids - // round trips through the buffer in the event of a series of partial - // writes in some circumstances. - if self.buf.len() + buf.len() > self.buf.capacity() { + // Unlike with `write`, we assume that a caller of `write_all` is + // interested in minimizing system calls even if the buffer is split. + // This method tries to fill up the buffer as much as possible before + // flushing, whereas `write` prefers not split incoming bufs. + + // Bypass the buffer if the the incoming write is larger than the whole + // buffer. + if buf.len() >= self.capacity() { self.flush_buf()?; + return self.get_mut().write_all(buf); } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 - if buf.len() >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write_all(buf); - self.panicked = false; - r - } else { - self.buf.extend_from_slice(buf); - Ok(()) + + // In order to reduce net writes in aggregate, we buffer as much as + // possible, then forward, then buffer the rest + let amt_buffered = self.write_to_buf(buf); + if amt_buffered < buf.len() { + self.flush_buf()?; + // At this point, because we know that buf.len() < self.buf.len(), + // we know that this will succeed in totality + self.buf.extend_from_slice(&buf[amt_buffered..]); } + Ok(()) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - let total_len = bufs.iter().map(|b| b.len()).sum::(); - if self.buf.len() + total_len > self.buf.capacity() { - self.flush_buf()?; - } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 - if total_len >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write_vectored(bufs); - self.panicked = false; - r + if self.get_ref().is_write_vectored() { + let total_len: usize = bufs.iter().map(|buf| buf.len()).sum(); + + if total_len + self.buffer().len() > self.capacity() { + self.flush_buf()?; + } + if total_len >= self.buf.capacity() { + self.get_mut().write_vectored(bufs) + } else { + // Correctness note: we've already verified that none of these + // will overflow the buffer, because total_len < capacity + bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf)); + Ok(total_len) + } } else { - bufs.iter().for_each(|b| self.buf.extend_from_slice(b)); - Ok(total_len) + // Because the inner writer doesn't have native vectored write + // support, we should take care of buffering together the individual + // incoming bufs, even if the *total* length is larger than our + // buffer. We only want to skip our buffer if an *individual* write + // exceeds our buffer capacity. + let mut total_buffered = 0; + + for buf in bufs { + if total_buffered == 0 { + if buf.len() + self.buffer().len() > self.capacity() { + // If an individual write would overflow our remaining + // capacity and we haven't buffered anything yet, + // pre-flush before buffering (same as with regular + // write()) + self.flush_buf()?; + } + + if buf.len() > self.capacity() { + // If an individual buffer exceeds our *total* capacity + // and we haven't buffered anything yet, just forward + // it to the underlying device + return self.get_mut().write(buf); + } + } + + // Buffer as much as possible until we reach full capacity + total_buffered += self.write_to_buf(buf); + if self.buffer().len() == self.capacity() { + break; + } + } + + Ok(total_buffered) } } From 3e9d0fbe369090d6a0c615671f95446427d40d2e Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 10:35:02 -0400 Subject: [PATCH 02/15] Make use of vectored writes! - BufWriter now makes use of vectored writes when flushing; it attempt to write both buffered data and incoming data in a single operation when it makes sense to do so. - LineWriterShim takes advantage of BufWriter's new "vectored flush" operation in a few places - Fixed a failing test. No new tests yet. --- library/std/src/io/buffered/bufwriter.rs | 230 +++++++++++++----- library/std/src/io/buffered/linewritershim.rs | 47 ++-- library/std/src/io/buffered/tests.rs | 30 ++- library/std/src/lib.rs | 1 + 4 files changed, 233 insertions(+), 75 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 0ca527a31a00a..dfcf5e22bf13e 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -3,6 +3,33 @@ use crate::io::{ self, Error, ErrorKind, IntoInnerError, IoSlice, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE, }; +/// Helper macro for a common write pattern. Write a buffer using the given +/// function call, then use the returned usize to get the unwritten tail of +/// the buffer. +/// +/// Example: +/// +/// ``` +/// // Use a ? for an i/o operation +/// let tail = tail!(self.flush_buf_vectored(buf)?); +/// +/// // omit the ? for an infallible operation +/// let tail = tail!(self.write_to_buffer(buf)); +/// ``` +macro_rules! tail { + ($this:ident $(. $write:ident)+ ($buf:expr)) => {{ + let buf = $buf; + let written = $this $(. $write)+ (buf); + &buf[written..] + }}; + + ($this:ident $(. $write:ident)+ ($buf:expr) ? ) => {{ + let buf = $buf; + let written = $this $(. $write)+ (buf)?; + &buf[written..] + }}; +} + /// Wraps a writer and buffers its output. /// /// It can be excessively inefficient to work directly with something that @@ -72,6 +99,51 @@ pub struct BufWriter { panicked: bool, } +/// Helper struct for BufWriter::flush_buf to ensure the buffer is updated +/// after all the writes are complete. It tracks the number of written bytes +/// and drains them all from the front of the buffer when dropped. +struct BufGuard<'a> { + buffer: &'a mut Vec, + written: usize, +} + +impl<'a> BufGuard<'a> { + fn new(buffer: &'a mut Vec) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + + /// Used in vectored flush mode; reports how many *extra* bytes after + /// `buffer` (ie, new bytes from the caller) were written + fn extra_written(&self) -> Option { + self.written.checked_sub(self.buffer.len()) + } +} + +impl Drop for BufGuard<'_> { + fn drop(&mut self) { + if self.written >= self.buffer.len() { + self.buffer.clear(); + } else if self.written > 0 { + self.buffer.drain(..self.written); + } + } +} + impl BufWriter { /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. @@ -115,45 +187,9 @@ impl BufWriter { /// `write`), any 0-length writes from `inner` must be reported as i/o /// errors from this method. pub(super) fn flush_buf(&mut self) -> io::Result<()> { - /// Helper struct to ensure the buffer is updated after all the writes - /// are complete. It tracks the number of written bytes and drains them - /// all from the front of the buffer when dropped. - struct BufGuard<'a> { - buffer: &'a mut Vec, - written: usize, - } - - impl<'a> BufGuard<'a> { - fn new(buffer: &'a mut Vec) -> Self { - Self { buffer, written: 0 } - } - - /// The unwritten part of the buffer - fn remaining(&self) -> &[u8] { - &self.buffer[self.written..] - } - - /// Flag some bytes as removed from the front of the buffer - fn consume(&mut self, amt: usize) { - self.written += amt; - } - - /// true if all of the bytes have been written - fn done(&self) -> bool { - self.written >= self.buffer.len() - } - } - - impl Drop for BufGuard<'_> { - fn drop(&mut self) { - if self.written > 0 { - self.buffer.drain(..self.written); - } - } - } - let mut guard = BufGuard::new(&mut self.buf); let inner = self.inner.as_mut().unwrap(); + while !guard.done() { self.panicked = true; let r = inner.write(guard.remaining()); @@ -174,6 +210,47 @@ impl BufWriter { Ok(()) } + /// Same as flush_buf, but uses vector operations to attempt to *also* + /// flush an incoming buffer. The returned usize is the number of bytes + /// successfully written from the *new* buf. This method will loop until + /// the entire *current* buffer is flushed, even if that means 0 bytes + /// from the new buffer were written. + pub(super) fn flush_buf_vectored(&mut self, buf: &[u8]) -> io::Result { + let inner = self.inner.as_mut().unwrap(); + + if !inner.is_write_vectored() { + self.flush_buf()?; + return Ok(0); + } + + let mut guard = BufGuard::new(&mut self.buf); + + // Continue looping only as long as there is unwritten content in self.buf + loop { + match guard.extra_written() { + None => { + let buffers = [IoSlice::new(guard.remaining()), IoSlice::new(buf)]; + self.panicked = true; + let r = inner.write_vectored(&buffers); + self.panicked = false; + + match r { + Ok(0) => { + return Err(Error::new( + ErrorKind::WriteZero, + "failed to write the buffered data", + )) + } + Ok(n) => guard.consume(n), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Some(extra) => return Ok(extra), + } + } + } + /// Buffer some data without flushing it, regardless of the size of the /// data. Writes as much as possible without exceeding capacity. Returns /// the number of bytes written. @@ -291,12 +368,36 @@ impl BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Write for BufWriter { fn write(&mut self, buf: &[u8]) -> io::Result { + // Design notes: + // // We assume that callers of `write` prefer to avoid split writes where // possible, so we pre-flush the buffer rather than doing a partial // write to fill it. + // + // During the pre-flush, we attempt a vectored write of both the + // buffered bytes and the new bytes. In the worst case, this will + // be the same as a typical pre-flush, since by default vectored + // writes just do a normal write of the first buffer. if self.buf.len() + buf.len() > self.buf.capacity() { - self.flush_buf()?; + let new_written = self.flush_buf_vectored(buf)?; + if new_written > 0 { + // At this point, we're obligated to return Ok(..) before + // trying any more fallible i/o operations. If the *remaining* + // buf fits in our buffer, buffer it eagerly; if not, buffering + // it would be pessimistic (since it's preferable to let the + // user follow up with another write call during which the + // write can be forwarded directly to inner, skipping the + // buffer). + let tail = &buf[new_written..]; + return if tail.len() < self.buf.capacity() { + self.buf.extend_from_slice(tail); + Ok(buf.len()) + } else { + Ok(new_written) + }; + } } + if buf.len() >= self.buf.capacity() { self.get_mut().write(buf) } else { @@ -311,27 +412,46 @@ impl Write for BufWriter { // This method tries to fill up the buffer as much as possible before // flushing, whereas `write` prefers not split incoming bufs. - // Bypass the buffer if the the incoming write is larger than the whole - // buffer. - if buf.len() >= self.capacity() { - self.flush_buf()?; - return self.get_mut().write_all(buf); - } + let buf = match buf.len() >= self.capacity() { + false => buf, + // Bypass the buffer if the the incoming write is larger than the + // whole buffer. Use a vectored write to attempt to write the new + // data and the existing buffer in a single operation + true => match tail!(self.flush_buf_vectored(buf)?) { + // If the vectored write flushed everything at once, we're done! + [] => return Ok(()), + + // If what's left after the vector flush is *still* larger than + // the buffer, bypass the buffer and forward it directly + tail if tail.len() >= self.capacity() => return self.get_mut().write_all(tail), + + // Otherwise, we're going to buffer whatever's left of the user input + tail => tail, + }, + }; // In order to reduce net writes in aggregate, we buffer as much as // possible, then forward, then buffer the rest - let amt_buffered = self.write_to_buf(buf); - if amt_buffered < buf.len() { - self.flush_buf()?; + let buf = tail!(self.write_to_buf(buf)); + + if !buf.is_empty() { + let buf = tail!(self.flush_buf_vectored(buf)?); + // At this point, because we know that buf.len() < self.buf.len(), // we know that this will succeed in totality - self.buf.extend_from_slice(&buf[amt_buffered..]); + self.buf.extend_from_slice(buf); } + Ok(()) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - if self.get_ref().is_write_vectored() { + if let [buf] = bufs { + // If there's exactly 1 incoming buffer, `Self::write` can make + // use of self.inner.write_vectored to attempt to combine flushing + // the existing buffer with writing the new one. + self.write(buf) + } else if self.get_ref().is_write_vectored() { let total_len: usize = bufs.iter().map(|buf| buf.len()).sum(); if total_len + self.buffer().len() > self.capacity() { @@ -383,11 +503,12 @@ impl Write for BufWriter { } fn is_write_vectored(&self) -> bool { - self.get_ref().is_write_vectored() + true } fn flush(&mut self) -> io::Result<()> { - self.flush_buf().and_then(|()| self.get_mut().flush()) + self.flush_buf()?; + self.get_mut().flush() } } @@ -398,7 +519,7 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("BufWriter") - .field("writer", &self.inner.as_ref().unwrap()) + .field("writer", self.get_ref()) .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) .finish() } @@ -418,9 +539,8 @@ impl Seek for BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for BufWriter { fn drop(&mut self) { - if self.inner.is_some() && !self.panicked { - // dtors should not panic, so we ignore a failed flush - let _r = self.flush_buf(); + if !self.panicked { + let _ = self.flush_buf(); } } } diff --git a/library/std/src/io/buffered/linewritershim.rs b/library/std/src/io/buffered/linewritershim.rs index a80d08db8692e..f43e6dc596437 100644 --- a/library/std/src/io/buffered/linewritershim.rs +++ b/library/std/src/io/buffered/linewritershim.rs @@ -20,6 +20,13 @@ impl<'a, W: Write> LineWriterShim<'a, W> { Self { buffer } } + /// Get a reference to the inner writer (that is, the writer wrapped by + /// the BufWriter). Be careful with this writer, as writes to it will + /// bypass the buffer. + fn inner_ref(&self) -> &W { + self.buffer.get_ref() + } + /// Get a mutable reference to the inner writer (that is, the writer /// wrapped by the BufWriter). Be careful with this writer, as writes to /// it will bypass the buffer. @@ -71,25 +78,29 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { Some(newline_idx) => newline_idx + 1, }; + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx]; + // Flush existing content to prepare for our write. We have to do this // before attempting to write `buf` in order to maintain consistency; // if we add `buf` to the buffer then try to flush it all at once, // we're obligated to return Ok(), which would mean suppressing any // errors that occur during flush. - self.buffer.flush_buf()?; - - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let lines = &buf[..newline_idx]; - - // Write `lines` directly to the inner writer. In keeping with the - // `write` convention, make at most one attempt to add new (unbuffered) - // data. Because this write doesn't touch the BufWriter state directly, - // and the buffer is known to be empty, we don't need to worry about - // self.buffer.panicked here. - let flushed = self.inner_mut().write(lines)?; + // + // We can, however, use a vectored write to attempt to write the lines + // at the same time as the buffer. + let flushed = match self.buffer.flush_buf_vectored(lines)? { + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + 0 => self.inner_mut().write(lines)?, + flushed => flushed, + }; - // If buffer returns Ok(0), propagate that to the caller without + // If the write returns Ok(0), propagate that to the caller without // doing additional buffering; otherwise we're just guaranteeing // an "ErrorKind::WriteZero" later. if flushed == 0 { @@ -159,9 +170,16 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// get the benefits of more granular partial-line handling without losing /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + // TODO: BufWriter recently received some optimized handling of + // vectored writes; update this method to take advantage of those + // updates. In particular, BufWriter::is_write_vectored is always true, + // because BufWriter::write_vectored takes special care to buffer + // together the incoming sub-buffers when !W::is_write_vectored, while + // still using W::write_vectored when it is. + // If there's no specialized behavior for write_vectored, just use // write. This has the benefit of more granular partial-line handling. - if !self.is_write_vectored() { + if !self.inner_ref().is_write_vectored() { return match bufs.iter().find(|buf| !buf.is_empty()) { Some(buf) => self.write(buf), None => Ok(0), @@ -178,7 +196,6 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write let last_newline_buf_idx = match last_newline_buf_idx { - // No newlines; just do a normal buffered write None => { self.flush_if_completed_line()?; return self.buffer.write_vectored(bufs); diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index 66a64f667baa4..85b18019b2dd5 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -14,7 +14,11 @@ pub struct ShortReader { // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } @@ -243,8 +247,11 @@ fn test_buffered_reader_seek_underflow_discard_buffer_between_seeks() { assert_eq!(reader.buffer().len(), 0); } +/// Basic tests of BufWriter when the wrapped writer supports vectored writes. +/// BufWriter will use vectored writes to attempt to combine buffer flushes +/// with incoming writes. #[test] -fn test_buffered_writer() { +fn test_buffered_writer_inner_vectored() { let inner = Vec::new(); let mut writer = BufWriter::with_capacity(2, inner); @@ -270,8 +277,8 @@ fn test_buffered_writer() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); writer.write(&[6]).unwrap(); - assert_eq!(writer.buffer(), [6]); - assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6]); writer.write(&[7, 8]).unwrap(); assert_eq!(writer.buffer(), []); @@ -451,7 +458,7 @@ fn bench_buffered_writer(b: &mut test::Bencher) { /// A simple `Write` target, designed to be wrapped by `LineWriter` / /// `BufWriter` / etc, that can have its `write` & `flush` behavior /// configured -#[derive(Default, Clone)] +#[derive(Default, Debug, Clone)] struct ProgrammableSink { // Writes append to this slice pub buffer: Vec, @@ -514,6 +521,19 @@ impl Write for ProgrammableSink { } } +/// PartialEq allows for easy comparison of the contents of a ProgrammableSink +impl PartialEq<[u8]> for ProgrammableSink { + fn eq(&self, other: &[u8]) -> bool { + self.buffer == other + } +} + +impl PartialEq<[u8; N]> for ProgrammableSink { + fn eq(&self, other: &[u8; N]) -> bool { + self.buffer == other + } +} + /// Previously the `LineWriter` could successfully write some bytes but /// then fail to report that it has done so. Additionally, an erroneous /// flush after a successful write was permanently ignored. diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index 96a7755c68821..fcc0371d4bba6 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -279,6 +279,7 @@ #![feature(maybe_uninit_extra)] #![feature(maybe_uninit_ref)] #![feature(maybe_uninit_slice)] +#![feature(min_const_generics)] #![feature(min_specialization)] #![feature(needs_panic_runtime)] #![feature(negative_impls)] From e961fe1ab25b60b585e070962f1bdd3f8150b53f Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 11:13:02 -0400 Subject: [PATCH 03/15] Edge case optimized in `write_vectored` --- library/std/src/io/buffered/bufwriter.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index dfcf5e22bf13e..20c7f5fe4cb16 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -446,7 +446,7 @@ impl Write for BufWriter { } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - if let [buf] = bufs { + if let Some(buf) = only_one(bufs, |b| !b.is_empty()) { // If there's exactly 1 incoming buffer, `Self::write` can make // use of self.inner.write_vectored to attempt to combine flushing // the existing buffer with writing the new one. @@ -544,3 +544,22 @@ impl Drop for BufWriter { } } } + +/// Similar to iter.find, this method searches an iterator for an item +/// matching a predicate, but returns it only if it is the *only* item +/// matching that predicate. Used to check if there is exactly one non-empty +/// buffer in a list input to write_vectored. +/// +/// TODO: delete this function and replace it with slice::trim if that becomes +/// a things (https://github.com/rust-lang/rfcs/issues/2547) +fn only_one(iter: I, filter: impl FnMut(&T) -> bool) -> Option +where + I: IntoIterator, + I::IntoIter: FusedIterator, +{ + let mut iter = iter.into_iter().filter(filter); + match (iter.next(), iter.count()) { + (Some(item), 0) => Some(item), + _ => None, + } +} From 41575e4eba695a981467d3be2405f483362a1e8a Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 11:41:45 -0400 Subject: [PATCH 04/15] Various additional updates - Added BufWriter::available; used it to simplify various checks - Refactored write to make it more simple; extensively recommented it --- library/std/src/io/buffered/bufwriter.rs | 78 +++++++++++------------- 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 20c7f5fe4cb16..29ae4649aa315 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -337,6 +337,11 @@ impl BufWriter { self.buf.capacity() } + /// Returns the unused buffer capacity. + fn available(&self) -> usize { + self.capacity() - self.buf.len() + } + /// Unwraps this `BufWriter`, returning the underlying writer. /// /// The buffer is written out before returning the writer. @@ -368,41 +373,32 @@ impl BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Write for BufWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - // Design notes: - // // We assume that callers of `write` prefer to avoid split writes where - // possible, so we pre-flush the buffer rather than doing a partial - // write to fill it. + // possible, so if the incoming buf doesn't fit in remaining available + // buffer, we pre-flush rather than doing a partial write to fill it. // - // During the pre-flush, we attempt a vectored write of both the - // buffered bytes and the new bytes. In the worst case, this will + // During the pre-flush, though, we attempt a vectored write of both + // the buffered bytes and the new bytes. In the worst case, this will // be the same as a typical pre-flush, since by default vectored - // writes just do a normal write of the first buffer. - if self.buf.len() + buf.len() > self.buf.capacity() { - let new_written = self.flush_buf_vectored(buf)?; - if new_written > 0 { - // At this point, we're obligated to return Ok(..) before - // trying any more fallible i/o operations. If the *remaining* - // buf fits in our buffer, buffer it eagerly; if not, buffering - // it would be pessimistic (since it's preferable to let the - // user follow up with another write call during which the - // write can be forwarded directly to inner, skipping the - // buffer). - let tail = &buf[new_written..]; - return if tail.len() < self.buf.capacity() { - self.buf.extend_from_slice(tail); - Ok(buf.len()) - } else { - Ok(new_written) - }; - } - } + // writes just do a normal write of the first buffer. In the best case, + // we were able to do some additional writing during a single syscall. + let tail = match buf.len() > self.available() { + true => tail!(self.flush_buf_vectored(buf)?), + false => buf, + }; - if buf.len() >= self.buf.capacity() { - self.get_mut().write(buf) - } else { - self.buf.extend_from_slice(buf); + // If the incoming buf doesn't fit in our buffer, even after we flushed + // it to make room, we should forward it directly (via inner.write). + // However, if the vectored flush successfully wrote some of `buf`, + // we're now obligated to return Ok(..) before trying any more + // fallible i/o operations. + if tail.len() < self.buf.capacity() { + self.buf.extend_from_slice(tail); Ok(buf.len()) + } else if tail.len() < buf.len() { + Ok(buf.len() - tail.len()) + } else { + self.get_mut().write(buf) } } @@ -412,11 +408,10 @@ impl Write for BufWriter { // This method tries to fill up the buffer as much as possible before // flushing, whereas `write` prefers not split incoming bufs. + // Bypass the buffer if the the incoming write is larger than the + // whole buffer. Use a vectored write to attempt to write the new + // data and the existing buffer in a single operation let buf = match buf.len() >= self.capacity() { - false => buf, - // Bypass the buffer if the the incoming write is larger than the - // whole buffer. Use a vectored write to attempt to write the new - // data and the existing buffer in a single operation true => match tail!(self.flush_buf_vectored(buf)?) { // If the vectored write flushed everything at once, we're done! [] => return Ok(()), @@ -428,17 +423,18 @@ impl Write for BufWriter { // Otherwise, we're going to buffer whatever's left of the user input tail => tail, }, + false => buf, }; // In order to reduce net writes in aggregate, we buffer as much as // possible, then forward, then buffer the rest let buf = tail!(self.write_to_buf(buf)); - if !buf.is_empty() { let buf = tail!(self.flush_buf_vectored(buf)?); // At this point, because we know that buf.len() < self.buf.len(), - // we know that this will succeed in totality + // and that the buffer has been flushed we know that this will + // succeed in totality self.buf.extend_from_slice(buf); } @@ -454,10 +450,10 @@ impl Write for BufWriter { } else if self.get_ref().is_write_vectored() { let total_len: usize = bufs.iter().map(|buf| buf.len()).sum(); - if total_len + self.buffer().len() > self.capacity() { + if total_len > self.available() { self.flush_buf()?; } - if total_len >= self.buf.capacity() { + if total_len >= self.capacity() { self.get_mut().write_vectored(bufs) } else { // Correctness note: we've already verified that none of these @@ -475,15 +471,15 @@ impl Write for BufWriter { for buf in bufs { if total_buffered == 0 { - if buf.len() + self.buffer().len() > self.capacity() { + if buf.len() > self.available() { // If an individual write would overflow our remaining // capacity and we haven't buffered anything yet, // pre-flush before buffering (same as with regular - // write()) + // write()). self.flush_buf()?; } - if buf.len() > self.capacity() { + if buf.len() >= self.capacity() { // If an individual buffer exceeds our *total* capacity // and we haven't buffered anything yet, just forward // it to the underlying device From c02228234ae62c289d6266ac77b6ee26765e6a6d Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 11:52:29 -0400 Subject: [PATCH 05/15] Missing import --- library/std/src/io/buffered/bufwriter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 29ae4649aa315..d9a6be7fcb80d 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -2,6 +2,7 @@ use crate::fmt; use crate::io::{ self, Error, ErrorKind, IntoInnerError, IoSlice, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE, }; +use crate::iter::FusedIterator; /// Helper macro for a common write pattern. Write a buffer using the given /// function call, then use the returned usize to get the unwritten tail of From 414748dcab051ff864781d9e1e2fbda1e412be5e Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 11:57:27 -0400 Subject: [PATCH 06/15] Elaborated on the exit condition commentary of write_vectored --- library/std/src/io/buffered/bufwriter.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index d9a6be7fcb80d..5436f376b5c7c 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -488,7 +488,10 @@ impl Write for BufWriter { } } - // Buffer as much as possible until we reach full capacity + // Buffer as much as possible until we reach full capacity. + // Once we've buffered at least 1 byte, we're obligated to + // return Ok(..) before attempting any fallible i/o operations, + // so once the buffer is full we immediately return. total_buffered += self.write_to_buf(buf); if self.buffer().len() == self.capacity() { break; From 06d0f1336bb16e544d659f3ed2a750122be52224 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 30 Oct 2020 12:09:33 -0400 Subject: [PATCH 07/15] Bugfixes in cleanup - Fixed bugs in write implementation; decompressed it to make the flow more clear. - Replaced several uses of .capacity() with .available(); in these cases they're identical (because they always occur after a completed flush) but available makes more sense conceptually. --- library/std/src/io/buffered/bufwriter.rs | 28 ++++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 5436f376b5c7c..c0b3ba90f16c9 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -383,24 +383,27 @@ impl Write for BufWriter { // be the same as a typical pre-flush, since by default vectored // writes just do a normal write of the first buffer. In the best case, // we were able to do some additional writing during a single syscall. - let tail = match buf.len() > self.available() { - true => tail!(self.flush_buf_vectored(buf)?), - false => buf, + let written = match buf.len() > self.available() { + true => self.flush_buf_vectored(buf)?, + false => 0, }; + let tail = &buf[written..]; // If the incoming buf doesn't fit in our buffer, even after we flushed // it to make room, we should forward it directly (via inner.write). // However, if the vectored flush successfully wrote some of `buf`, // we're now obligated to return Ok(..) before trying any more // fallible i/o operations. - if tail.len() < self.buf.capacity() { + let tail_written = if tail.len() < self.available() { self.buf.extend_from_slice(tail); - Ok(buf.len()) - } else if tail.len() < buf.len() { - Ok(buf.len() - tail.len()) + tail.len() + } else if written > 0 { + 0 } else { - self.get_mut().write(buf) - } + self.get_mut().write(tail)? + }; + + Ok(written + tail_written) } fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { @@ -454,7 +457,8 @@ impl Write for BufWriter { if total_len > self.available() { self.flush_buf()?; } - if total_len >= self.capacity() { + + if total_len >= self.available() { self.get_mut().write_vectored(bufs) } else { // Correctness note: we've already verified that none of these @@ -480,7 +484,7 @@ impl Write for BufWriter { self.flush_buf()?; } - if buf.len() >= self.capacity() { + if buf.len() >= self.available() { // If an individual buffer exceeds our *total* capacity // and we haven't buffered anything yet, just forward // it to the underlying device @@ -493,7 +497,7 @@ impl Write for BufWriter { // return Ok(..) before attempting any fallible i/o operations, // so once the buffer is full we immediately return. total_buffered += self.write_to_buf(buf); - if self.buffer().len() == self.capacity() { + if self.available() == 0 { break; } } From b7ca9f1f55b0fd98d14d733a45303dabbd316764 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 31 Oct 2020 22:03:55 -0400 Subject: [PATCH 08/15] Added tests; fixed bugs --- library/std/src/io/buffered/bufwriter.rs | 20 ++- library/std/src/io/buffered/tests.rs | 197 +++++++++++++++++++++-- 2 files changed, 201 insertions(+), 16 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index c0b3ba90f16c9..256b3cfac5478 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -394,9 +394,8 @@ impl Write for BufWriter { // However, if the vectored flush successfully wrote some of `buf`, // we're now obligated to return Ok(..) before trying any more // fallible i/o operations. - let tail_written = if tail.len() < self.available() { - self.buf.extend_from_slice(tail); - tail.len() + let tail_written = if tail.len() < self.capacity() { + self.write_to_buf(tail) } else if written > 0 { 0 } else { @@ -439,7 +438,14 @@ impl Write for BufWriter { // At this point, because we know that buf.len() < self.buf.len(), // and that the buffer has been flushed we know that this will // succeed in totality - self.buf.extend_from_slice(buf); + self.write_to_buf(buf); + } + + // If, at this point, the buffer is full, we may as well eagerly + // attempt to flush, so that the next write will have an empty + // buffer. + if self.available() == 0 { + self.flush_buf()?; } Ok(()) @@ -458,7 +464,7 @@ impl Write for BufWriter { self.flush_buf()?; } - if total_len >= self.available() { + if total_len >= self.capacity() { self.get_mut().write_vectored(bufs) } else { // Correctness note: we've already verified that none of these @@ -544,7 +550,9 @@ impl Seek for BufWriter { impl Drop for BufWriter { fn drop(&mut self) { if !self.panicked { - let _ = self.flush_buf(); + if self.inner.is_some() { + let _ = self.flush_buf(); + } } } } diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index 85b18019b2dd5..f9956efe2e04d 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -293,6 +293,121 @@ fn test_buffered_writer_inner_vectored() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } +#[test] +fn test_buffered_writer_write() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + writer.write(b"ab").unwrap(); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.buffer(), b"ab"); + + // `write` prefers to avoid split writes + writer.write(b"cde").unwrap(); + assert_eq!(writer.get_ref(), b"ab"); + assert_eq!(writer.buffer(), b"cde"); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // `write` skips the buffer if the write is >= capacity + writer.write(b"abcd").unwrap(); + assert_eq!(writer.get_ref(), b"abcd"); + assert_eq!(writer.buffer(), b""); + + writer.write(b"efghi").unwrap(); + assert_eq!(writer.get_ref(), b"abcdefghi"); + assert_eq!(writer.buffer(), b""); + + // `write` fills the buffer + writer.write(b"jk").unwrap(); + writer.write(b"lm").unwrap(); + assert_eq!(writer.get_ref(), b"abcdefghi"); + assert_eq!(writer.buffer(), b"jklm"); +} + +#[test] +fn test_buffered_writer_write_all_unvectored() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // write_all will perform split writes as necessary such that the minimal + // number of writes are performed on inner. Additionally, it will eagerly + // flush if a write_all fully fills the buffer + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b"bc"); + assert_eq!(writer.get_ref(), b"abca"); + + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcabcabcabc"); + assert_eq!(writer.get_ref().write_count, 3); +} + +#[test] +fn test_buffered_writer_write_all_vectored() { + let inner = ProgrammableSink { enable_vectored: true, ..ProgrammableSink::default() }; + let mut writer = BufWriter::with_capacity(10, inner); + + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b"abcabcabc"); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.get_ref().write_count, 0); + + // This should be used to fill the buffer, but then the whole thing should + // be sent as a single vectored write + writer.write_all(b"aaaaaa").unwrap(); + + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcabcabcaaaaaa"); + assert_eq!(writer.get_ref().write_count, 1); +} + +#[test] +fn test_buffered_writer_write_vectored() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // A vectored write is buffered, even if the total size is larger than the + // buffer + let input = + [IoSlice::new(b"aa"), IoSlice::new(b"bb"), IoSlice::new(b"cc"), IoSlice::new(b"dd")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 4); + assert_eq!(writer.buffer(), b"aabb"); + assert_eq!(writer.get_ref(), b""); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // If the first encountered buffer is large, it is forwarded directly + let input = [IoSlice::new(b""), IoSlice::new(b"abcdefg")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 7); + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcdefg"); + assert_eq!(writer.get_ref().write_count, 1); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // if a subsequent encountered buffer is large, it is buffered (because of + // the infallibility requirement) + let input = [IoSlice::new(b"a"), IoSlice::new(b"bcdefg")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 4); + assert_eq!(writer.buffer(), b"abcd"); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.get_ref().write_count, 0); +} + #[test] fn test_buffered_writer_inner_flushes() { let mut w = BufWriter::with_capacity(3, Vec::new()); @@ -460,29 +575,36 @@ fn bench_buffered_writer(b: &mut test::Bencher) { /// configured #[derive(Default, Debug, Clone)] struct ProgrammableSink { - // Writes append to this slice + /// Writes append to this slice pub buffer: Vec, - // Flush sets this flag + /// Flush sets this flag pub flushed: bool, - // If true, writes will always be an error + /// Each `write` call increments this + pub write_count: usize, + + /// If true, writes will always be an error pub always_write_error: bool, - // If true, flushes will always be an error + /// If true, flushes will always be an error pub always_flush_error: bool, - // If set, only up to this number of bytes will be written in a single - // call to `write` + /// If set, only up to this number of bytes will be written in a single + /// call to `write` pub accept_prefix: Option, - // If set, counts down with each write, and writes return an error - // when it hits 0 + /// If set, counts down with each write, and writes return an error + /// when it hits 0 pub max_writes: Option, - // If set, attempting to write when max_writes == Some(0) will be an - // error; otherwise, it will return Ok(0). + /// If set, attempting to write when max_writes == Some(0) will be an + /// error; otherwise, it will return Ok(0). pub error_after_max_writes: bool, + + /// If set, vectored writes are enabled. All of the above configuration + /// will apply to each write_vectored call as though it was a single write. + pub enable_vectored: bool, } impl Write for ProgrammableSink { @@ -507,6 +629,7 @@ impl Write for ProgrammableSink { let data = &data[..len]; self.buffer.extend_from_slice(data); + self.write_count += 1; Ok(len) } @@ -519,6 +642,60 @@ impl Write for ProgrammableSink { Ok(()) } } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + if !self.enable_vectored { + // If we're not vectored, use the default behavior, which is to + // write the first non-empty buf with write. + return match bufs.iter().find(|b| !b.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + }; + } + + if self.always_write_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error")); + } + + match self.max_writes { + Some(0) if self.error_after_max_writes => { + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")); + } + Some(0) => return Ok(0), + Some(ref mut count) => *count -= 1, + None => {} + } + + let total_written = match self.accept_prefix { + None => bufs.iter().fold(0, |len, buf| { + self.buffer.extend_from_slice(buf); + len + buf.len() + }), + Some(mut len) => { + let mut written = 0; + + for buf in bufs { + if len == 0 { + break; + } + + let buf = &buf[..buf.len().min(len)]; + self.buffer.extend_from_slice(buf); + written += buf.len(); + len -= buf.len(); + } + + written + } + }; + + self.write_count += 1; + Ok(total_written) + } + + fn is_write_vectored(&self) -> bool { + self.enable_vectored + } } /// PartialEq allows for easy comparison of the contents of a ProgrammableSink From 0a9721cc193766b21cc3c080a23c4393c327722c Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 1 Nov 2020 00:17:42 -0400 Subject: [PATCH 09/15] revert pointless change to Drop --- library/std/src/io/buffered/bufwriter.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 256b3cfac5478..b31bd9b5a3f9a 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -549,10 +549,9 @@ impl Seek for BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Drop for BufWriter { fn drop(&mut self) { - if !self.panicked { - if self.inner.is_some() { - let _ = self.flush_buf(); - } + if self.inner.is_some() && !self.panicked { + // dtors should not panic, so we ignore a failed flush + let _r = self.flush_buf(); } } } From 4ee3bcada560105d36340666276401a5fc25640c Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 1 Nov 2020 00:19:26 -0400 Subject: [PATCH 10/15] x.py fmt still disagrees with rustfmt from the editor --- library/std/src/io/buffered/bufwriter.rs | 2 +- library/std/src/io/buffered/tests.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index b31bd9b5a3f9a..0dc7b30d68337 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -240,7 +240,7 @@ impl BufWriter { return Err(Error::new( ErrorKind::WriteZero, "failed to write the buffered data", - )) + )); } Ok(n) => guard.consume(n), Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index f9956efe2e04d..19e8a9fe2e11f 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -14,11 +14,7 @@ pub struct ShortReader { // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { - Ok(0) - } else { - Ok(self.lengths.remove(0)) - } + if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } } } From 9d23026ed3a5906f116d829aaf6ce5fc81c4e4be Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 1 Nov 2020 16:57:32 -0500 Subject: [PATCH 11/15] Replacee TODO with FIXME, per tidy --- library/std/src/io/buffered/bufwriter.rs | 2 +- library/std/src/io/buffered/linewritershim.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 0dc7b30d68337..0b010c4d7cb71 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -561,7 +561,7 @@ impl Drop for BufWriter { /// matching that predicate. Used to check if there is exactly one non-empty /// buffer in a list input to write_vectored. /// -/// TODO: delete this function and replace it with slice::trim if that becomes +/// FIXME: delete this function and replace it with slice::trim if that becomes /// a things (https://github.com/rust-lang/rfcs/issues/2547) fn only_one(iter: I, filter: impl FnMut(&T) -> bool) -> Option where diff --git a/library/std/src/io/buffered/linewritershim.rs b/library/std/src/io/buffered/linewritershim.rs index f43e6dc596437..2c7819d1b04ed 100644 --- a/library/std/src/io/buffered/linewritershim.rs +++ b/library/std/src/io/buffered/linewritershim.rs @@ -170,7 +170,7 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// get the benefits of more granular partial-line handling without losing /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - // TODO: BufWriter recently received some optimized handling of + // FIXME: BufWriter recently received some optimized handling of // vectored writes; update this method to take advantage of those // updates. In particular, BufWriter::is_write_vectored is always true, // because BufWriter::write_vectored takes special care to buffer From 89dc1acf67337e939ace0cc973c82df4abc63f91 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 1 Nov 2020 17:40:08 -0500 Subject: [PATCH 12/15] Update comments in tests --- library/std/src/io/buffered/tests.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index 19e8a9fe2e11f..50717dbb136b9 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -9,11 +9,10 @@ pub struct ShortReader { lengths: Vec, } -// FIXME: rustfmt and tidy disagree about the correct formatting of this -// function. This leads to issues for users with editors configured to -// rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { + // Note for developers: if your editor is fighting you about the + // correct rustfmt here, make sure you're using nightly if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } } } From 5ef0d7a719800e0b8eecce189e833f26a2042259 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 28 Nov 2020 19:57:50 -0500 Subject: [PATCH 13/15] Cleanup & Optimize only_one function --- library/std/src/io/buffered/bufwriter.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 0b010c4d7cb71..99309ce394568 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -562,15 +562,15 @@ impl Drop for BufWriter { /// buffer in a list input to write_vectored. /// /// FIXME: delete this function and replace it with slice::trim if that becomes -/// a things (https://github.com/rust-lang/rfcs/issues/2547) -fn only_one(iter: I, filter: impl FnMut(&T) -> bool) -> Option +/// a thing (https://github.com/rust-lang/rfcs/issues/2547) +fn only_one(iter: I, filter: impl Fn(&I::Item) -> bool) -> Option where - I: IntoIterator, + I: IntoIterator, I::IntoIter: FusedIterator, { let mut iter = iter.into_iter().filter(filter); - match (iter.next(), iter.count()) { - (Some(item), 0) => Some(item), + match (iter.next(), iter.next()) { + (Some(item), None) => Some(item), _ => None, } } From 3ef40eade6f83e1831b3115348171401b17835df Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 28 Nov 2020 20:28:36 -0500 Subject: [PATCH 14/15] simplify tail! macro --- library/std/src/io/buffered/bufwriter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 99309ce394568..2fb89b3821f02 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -18,15 +18,15 @@ use crate::iter::FusedIterator; /// let tail = tail!(self.write_to_buffer(buf)); /// ``` macro_rules! tail { - ($this:ident $(. $write:ident)+ ($buf:expr)) => {{ + ($($write:ident).+ ($buf:expr)) => {{ let buf = $buf; - let written = $this $(. $write)+ (buf); + let written = $($write).+ (buf); &buf[written..] }}; - ($this:ident $(. $write:ident)+ ($buf:expr) ? ) => {{ + ($($write:ident).+ ($buf:expr) ? ) => {{ let buf = $buf; - let written = $this $(. $write)+ (buf)?; + let written = $($write).+ (buf)?; &buf[written..] }}; } From 6a73f0f4fc43a77d2fbc0ed5c224578a7b4a7811 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sun, 29 Nov 2020 14:02:37 -0500 Subject: [PATCH 15/15] Fix bug in BufWriter::write_vectored - Found and fixed a bug where write_vectored could, in some circumstances, forward a write directly to the inner writer (skipping the buffer) without first flushing the buffer. - Added a regression test for this bug. --- library/std/src/io/buffered/bufwriter.rs | 4 +++- library/std/src/io/buffered/tests.rs | 26 +++++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 2fb89b3821f02..e03d2eb8baf7a 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -399,6 +399,8 @@ impl Write for BufWriter { } else if written > 0 { 0 } else { + // It's guaranteed at this point that the buffer is empty, because + // if wasn't, it would have been flushed earlier in this function. self.get_mut().write(tail)? }; @@ -490,7 +492,7 @@ impl Write for BufWriter { self.flush_buf()?; } - if buf.len() >= self.available() { + if buf.len() >= self.capacity() && self.buf.is_empty() { // If an individual buffer exceeds our *total* capacity // and we haven't buffered anything yet, just forward // it to the underlying device diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index 50717dbb136b9..a39e425ace4e2 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -13,7 +13,11 @@ impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { // Note for developers: if your editor is fighting you about the // correct rustfmt here, make sure you're using nightly - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } @@ -403,6 +407,26 @@ fn test_buffered_writer_write_vectored() { assert_eq!(writer.get_ref().write_count, 0); } +// An old implementation of BufWriter had a bug when deciding whether to +// forward a write directly (skipping the buffer) where it would forward even +// if there was buffered data from a previous write. This is a regression test +// for that bug. +#[test] +fn test_buffered_writer_vectored_corner_case() -> io::Result<()> { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(10, inner); + + assert_eq!(writer.write(b"aaaa")?, 4); + + let input = [IoSlice::new(b"bbbbbb"), IoSlice::new(b"cccccc")]; + + writer.write_vectored(&input)?; + writer.flush()?; + assert_eq!(&writer.get_ref().buffer[..10], b"aaaabbbbbb"); + + Ok(()) +} + #[test] fn test_buffered_writer_inner_flushes() { let mut w = BufWriter::with_capacity(3, Vec::new());