Skip to content

Commit 97ae022

Browse files
authored
retry: Simplify ReplayBody::poll_data for readability (#1346)
`ReplayBody::poll_data includes some deeply nested logic that can be avoided with early returns. This restructures `ReplayBody::poll_data` so that the control flow is easier to follow.
1 parent bfc1e2c commit 97ae022

File tree

1 file changed

+48
-47
lines changed

1 file changed

+48
-47
lines changed

linkerd/http-retry/src/lib.rs

Lines changed: 48 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,21 @@ where
180180
buf.has_remaining = state.buf.has_remaining(),
181181
body.is_completed = state.is_completed,
182182
body.max_bytes_remaining = state.max_bytes,
183-
"Replay::poll_data"
183+
"ReplayBody::poll_data"
184184
);
185185

186186
// If we haven't replayed the buffer yet, and its not empty, return the
187187
// buffered data first.
188188
if this.replay_body {
189189
if state.buf.has_remaining() {
190-
tracing::trace!("replaying body");
190+
tracing::trace!("Replaying body");
191191
// Don't return the buffered data again on the next poll.
192192
this.replay_body = false;
193193
return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone()))));
194194
}
195195

196196
if state.is_capped() {
197-
tracing::trace!("cannot replay buffered body, maximum buffer length reached");
197+
tracing::trace!("Cannot replay buffered body, maximum buffer length reached");
198198
return Poll::Ready(Some(Err(Capped.into())));
199199
}
200200
}
@@ -209,50 +209,50 @@ where
209209
return Poll::Ready(None);
210210
}
211211

212-
// If there's more data in the initial body, poll that...
213-
if let Some(rest) = state.rest.as_mut() {
212+
// Poll the inner body for more data. If the body has ended, remember
213+
// that so that future clones will not try polling it again (as
214+
// described above).
215+
let mut data = {
216+
// Get access to the initial body. If we don't have access to the
217+
// inner body, there's no more work to do.
218+
let rest = match state.rest.as_mut() {
219+
Some(rest) => rest,
220+
None => return Poll::Ready(None),
221+
};
222+
214223
tracing::trace!("Polling initial body");
215-
let opt = futures::ready!(Pin::new(rest).poll_data(cx));
216-
217-
// If the body has ended, remember that so that future clones will
218-
// not try polling it again --- some `Body` types will panic if they
219-
// are polled after returning `None`.
220-
if opt.is_none() {
221-
tracing::trace!("Initial body completed");
222-
state.is_completed = true;
224+
match futures::ready!(Pin::new(rest).poll_data(cx)) {
225+
Some(Ok(data)) => data,
226+
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
227+
None => {
228+
tracing::trace!("Initial body completed");
229+
state.is_completed = true;
230+
return Poll::Ready(None);
231+
}
223232
}
224-
return Poll::Ready(opt.map(|ok| {
225-
ok.map(|mut data| {
226-
// If we have buffered the maximum number of bytes, allow
227-
// *this* body to continue, but don't buffer any more.
228-
let length = data.remaining();
229-
state.max_bytes = state.max_bytes.saturating_sub(length);
230-
if state.is_capped() {
231-
// If there's data in the buffer, discard it now, since
232-
// we won't allow any clones to have a complete body.
233-
if state.buf.has_remaining() {
234-
tracing::debug!(
235-
buf.size = state.buf.remaining(),
236-
"buffered maximum capacity, discarding buffer"
237-
);
238-
state.buf = Default::default();
239-
}
240-
return Data::Initial(data.copy_to_bytes(length));
241-
}
242-
243-
if state.is_capped() {
244-
return Data::Initial(data.copy_to_bytes(length));
245-
}
233+
};
246234

247-
// Buffer and return the bytes
248-
Data::Initial(state.buf.push_chunk(data))
249-
})
250-
.map_err(Into::into)
251-
}));
252-
}
235+
// If we have buffered the maximum number of bytes, allow *this* body to
236+
// continue, but don't buffer any more.
237+
let length = data.remaining();
238+
state.max_bytes = state.max_bytes.saturating_sub(length);
239+
let chunk = if state.is_capped() {
240+
// If there's data in the buffer, discard it now, since we won't
241+
// allow any clones to have a complete body.
242+
if state.buf.has_remaining() {
243+
tracing::debug!(
244+
buf.size = state.buf.remaining(),
245+
"Buffered maximum capacity, discarding buffer"
246+
);
247+
state.buf = Default::default();
248+
}
249+
data.copy_to_bytes(length)
250+
} else {
251+
// Buffer and return the bytes.
252+
state.buf.push_chunk(data)
253+
};
253254

254-
// Otherwise, guess we're done!
255-
Poll::Ready(None)
255+
Poll::Ready(Some(Ok(Data::Initial(chunk))))
256256
}
257257

258258
fn poll_trailers(
@@ -319,16 +319,17 @@ where
319319
None => return self.shared.orig_size_hint.clone(),
320320
};
321321

322-
// Otherwise, if we're holding the state but have dropped the inner body, the entire body is
323-
// buffered so we know the exact size hint.
322+
// Otherwise, if we're holding the state but have dropped the inner
323+
// body, the entire body is buffered so we know the exact size hint.
324324
let buffered = state.buf.remaining() as u64;
325325
let rest_hint = match state.rest.as_ref() {
326326
Some(rest) => rest.size_hint(),
327327
None => return SizeHint::with_exact(buffered),
328328
};
329329

330-
// Otherwise, add the inner body's size hint to the amount of buffered data. An upper limit
331-
// is only set if the inner body has an upper limit.
330+
// Otherwise, add the inner body's size hint to the amount of buffered
331+
// data. An upper limit is only set if the inner body has an upper
332+
// limit.
332333
let mut hint = SizeHint::default();
333334
hint.set_lower(buffered + rest_hint.lower());
334335
if let Some(rest_upper) = rest_hint.upper() {

0 commit comments

Comments
 (0)