diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index 90ba4e4563..8a6e2784b6 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -180,21 +180,21 @@ where buf.has_remaining = state.buf.has_remaining(), body.is_completed = state.is_completed, body.max_bytes_remaining = state.max_bytes, - "Replay::poll_data" + "ReplayBody::poll_data" ); // If we haven't replayed the buffer yet, and its not empty, return the // buffered data first. if this.replay_body { if state.buf.has_remaining() { - tracing::trace!("replaying body"); + tracing::trace!("Replaying body"); // Don't return the buffered data again on the next poll. this.replay_body = false; return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); } if state.is_capped() { - tracing::trace!("cannot replay buffered body, maximum buffer length reached"); + tracing::trace!("Cannot replay buffered body, maximum buffer length reached"); return Poll::Ready(Some(Err(Capped.into()))); } } @@ -209,50 +209,50 @@ where return Poll::Ready(None); } - // If there's more data in the initial body, poll that... - if let Some(rest) = state.rest.as_mut() { + // Poll the inner body for more data. If the body has ended, remember + // that so that future clones will not try polling it again (as + // described above). + let mut data = { + // Get access to the initial body. If we don't have access to the + // inner body, there's no more work to do. + let rest = match state.rest.as_mut() { + Some(rest) => rest, + None => return Poll::Ready(None), + }; + tracing::trace!("Polling initial body"); - let opt = futures::ready!(Pin::new(rest).poll_data(cx)); - - // If the body has ended, remember that so that future clones will - // not try polling it again --- some `Body` types will panic if they - // are polled after returning `None`. - if opt.is_none() { - tracing::trace!("Initial body completed"); - state.is_completed = true; + match futures::ready!(Pin::new(rest).poll_data(cx)) { + Some(Ok(data)) => data, + Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + None => { + tracing::trace!("Initial body completed"); + state.is_completed = true; + return Poll::Ready(None); + } } - return Poll::Ready(opt.map(|ok| { - ok.map(|mut data| { - // If we have buffered the maximum number of bytes, allow - // *this* body to continue, but don't buffer any more. - let length = data.remaining(); - state.max_bytes = state.max_bytes.saturating_sub(length); - if state.is_capped() { - // If there's data in the buffer, discard it now, since - // we won't allow any clones to have a complete body. - if state.buf.has_remaining() { - tracing::debug!( - buf.size = state.buf.remaining(), - "buffered maximum capacity, discarding buffer" - ); - state.buf = Default::default(); - } - return Data::Initial(data.copy_to_bytes(length)); - } - - if state.is_capped() { - return Data::Initial(data.copy_to_bytes(length)); - } + }; - // Buffer and return the bytes - Data::Initial(state.buf.push_chunk(data)) - }) - .map_err(Into::into) - })); - } + // If we have buffered the maximum number of bytes, allow *this* body to + // continue, but don't buffer any more. + let length = data.remaining(); + state.max_bytes = state.max_bytes.saturating_sub(length); + let chunk = if state.is_capped() { + // If there's data in the buffer, discard it now, since we won't + // allow any clones to have a complete body. + if state.buf.has_remaining() { + tracing::debug!( + buf.size = state.buf.remaining(), + "Buffered maximum capacity, discarding buffer" + ); + state.buf = Default::default(); + } + data.copy_to_bytes(length) + } else { + // Buffer and return the bytes. + state.buf.push_chunk(data) + }; - // Otherwise, guess we're done! - Poll::Ready(None) + Poll::Ready(Some(Ok(Data::Initial(chunk)))) } fn poll_trailers( @@ -319,16 +319,17 @@ where None => return self.shared.orig_size_hint.clone(), }; - // Otherwise, if we're holding the state but have dropped the inner body, the entire body is - // buffered so we know the exact size hint. + // Otherwise, if we're holding the state but have dropped the inner + // body, the entire body is buffered so we know the exact size hint. let buffered = state.buf.remaining() as u64; let rest_hint = match state.rest.as_ref() { Some(rest) => rest.size_hint(), None => return SizeHint::with_exact(buffered), }; - // Otherwise, add the inner body's size hint to the amount of buffered data. An upper limit - // is only set if the inner body has an upper limit. + // Otherwise, add the inner body's size hint to the amount of buffered + // data. An upper limit is only set if the inner body has an upper + // limit. let mut hint = SizeHint::default(); hint.set_lower(buffered + rest_hint.lower()); if let Some(rest_upper) = rest_hint.upper() {