Skip to content

Commit 7c12a2c

Browse files
committed
fix(client): ensure idle connection is pooled before response body finishes
1 parent 06405c2 commit 7c12a2c

File tree

4 files changed

+104
-17
lines changed

4 files changed

+104
-17
lines changed

src/client/conn.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ impl<B> SendRequest<B>
128128
self.dispatch.poll_ready(cx)
129129
}
130130

131+
pub(super) fn is_ready(&self) -> bool {
132+
self.dispatch.is_ready()
133+
}
134+
131135
pub(super) fn is_closed(&self) -> bool {
132136
self.dispatch.is_closed()
133137
}

src/client/dispatch.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ impl<T, U> Sender<T, U> {
4545
}
4646
}
4747

48+
pub fn is_ready(&self) -> bool {
49+
self.giver.is_wanting()
50+
}
51+
4852
pub fn is_closed(&self) -> bool {
4953
self.giver.is_canceled()
5054
}

src/client/mod.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use std::time::Duration;
88

99
use futures::{Async, Future, FutureExt, Never, Poll};
10+
use futures::channel::oneshot;
1011
use futures::future;
1112
use futures::task;
1213
use http::{Method, Request, Response, Uri, Version};
@@ -235,7 +236,7 @@ where C: Connect<Error=io::Error> + Sync + 'static,
235236
ClientError::Normal(err)
236237
}
237238
})
238-
.and_then(move |res| {
239+
.and_then(move |mut res| {
239240
future::lazy(move |cx| {
240241
// when pooled is dropped, it will try to insert back into the
241242
// pool. To delay that, spawn a future that completes once the
@@ -245,14 +246,24 @@ where C: Connect<Error=io::Error> + Sync + 'static,
245246
// for a new request to start.
246247
//
247248
// It won't be ready if there is a body to stream.
248-
if let Ok(Async::Pending) = pooled.tx.poll_ready(cx) {
249+
if pooled.tx.is_ready() {
250+
drop(pooled);
251+
} else if !res.body().is_empty() {
252+
let (delayed_tx, delayed_rx) = oneshot::channel();
253+
res.body_mut().delayed_eof(delayed_rx);
249254
// If the executor doesn't have room, oh well. Things will likely
250255
// be blowing up soon, but this specific task isn't required.
251-
execute(future::poll_fn(move |cx| {
252-
pooled.tx.poll_ready(cx).or(Ok(Async::Ready(())))
253-
}), cx).ok();
256+
let fut = future::poll_fn(move |cx| {
257+
pooled.tx.poll_ready(cx)
258+
})
259+
.then(move |_| {
260+
// At this point, `pooled` is dropped, and had a chance
261+
// to insert into the pool (if conn was idle)
262+
drop(delayed_tx);
263+
Ok(())
264+
});
265+
execute(fut, cx).ok();
254266
}
255-
256267
Ok(res)
257268
})
258269
});

src/proto/body.rs

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::borrow::Cow;
33
use std::fmt;
44

55
use bytes::Bytes;
6-
use futures::{Async, Future, Poll, Stream, StreamExt};
6+
use futures::{Async, Future, Never, Poll, Stream, StreamExt};
77
use futures::task;
88
use futures::channel::{mpsc, oneshot};
99
use http::HeaderMap;
@@ -125,6 +125,15 @@ impl<E: Entity> Stream for EntityStream<E> {
125125
#[must_use = "streams do nothing unless polled"]
126126
pub struct Body {
127127
kind: Kind,
128+
/// Allow the client to pass a future to delay the `Body` from returning
129+
/// EOF. This allows the `Client` to try to put the idle connection
130+
/// back into the pool before the body is "finished".
131+
///
132+
/// The reason for this is so that creating a new request after finishing
133+
/// streaming the body of a response could sometimes result in creating
134+
/// a brand new connection, since the pool didn't know about the idle
135+
/// connection yet.
136+
delayed_eof: Option<DelayEof>,
128137
}
129138

130139
enum Kind {
@@ -137,6 +146,17 @@ enum Kind {
137146
Empty,
138147
}
139148

149+
type DelayEofUntil = oneshot::Receiver<Never>;
150+
151+
enum DelayEof {
152+
/// Initial state, stream hasn't seen EOF yet.
153+
NotEof(DelayEofUntil),
154+
/// Transitions to this state once we've seen `poll` try to
155+
/// return EOF (`None`). This future is then polled, and
156+
/// when it completes, the Body finally returns EOF (`None`).
157+
Eof(DelayEofUntil),
158+
}
159+
140160
/// A sender half used with `Body::channel()`.
141161
#[derive(Debug)]
142162
pub struct Sender {
@@ -253,22 +273,53 @@ impl Body {
253273
fn new(kind: Kind) -> Body {
254274
Body {
255275
kind: kind,
276+
delayed_eof: None,
256277
}
257278
}
258-
}
259279

260-
impl Default for Body {
261-
#[inline]
262-
fn default() -> Body {
263-
Body::empty()
280+
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
281+
self.delayed_eof = Some(DelayEof::NotEof(fut));
264282
}
265-
}
266283

267-
impl Entity for Body {
268-
type Data = Chunk;
269-
type Error = ::Error;
284+
fn poll_eof(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> {
285+
match self.delayed_eof.take() {
286+
Some(DelayEof::NotEof(mut delay)) => {
287+
match self.poll_inner(cx) {
288+
ok @ Ok(Async::Ready(Some(..))) |
289+
ok @ Ok(Async::Pending) => {
290+
self.delayed_eof = Some(DelayEof::NotEof(delay));
291+
ok
292+
},
293+
Ok(Async::Ready(None)) => match delay.poll(cx) {
294+
Ok(Async::Ready(never)) => match never {},
295+
Ok(Async::Pending) => {
296+
self.delayed_eof = Some(DelayEof::Eof(delay));
297+
Ok(Async::Pending)
298+
},
299+
Err(_done) => {
300+
Ok(Async::Ready(None))
301+
},
302+
},
303+
Err(e) => Err(e),
304+
}
305+
},
306+
Some(DelayEof::Eof(mut delay)) => {
307+
match delay.poll(cx) {
308+
Ok(Async::Ready(never)) => match never {},
309+
Ok(Async::Pending) => {
310+
self.delayed_eof = Some(DelayEof::Eof(delay));
311+
Ok(Async::Pending)
312+
},
313+
Err(_done) => {
314+
Ok(Async::Ready(None))
315+
},
316+
}
317+
},
318+
None => self.poll_inner(cx),
319+
}
320+
}
270321

271-
fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> {
322+
fn poll_inner(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> {
272323
match self.kind {
273324
Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") {
274325
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
@@ -281,6 +332,22 @@ impl Entity for Body {
281332
Kind::Empty => Ok(Async::Ready(None)),
282333
}
283334
}
335+
}
336+
337+
impl Default for Body {
338+
#[inline]
339+
fn default() -> Body {
340+
Body::empty()
341+
}
342+
}
343+
344+
impl Entity for Body {
345+
type Data = Chunk;
346+
type Error = ::Error;
347+
348+
fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> {
349+
self.poll_eof(cx)
350+
}
284351

285352
fn is_end_stream(&self) -> bool {
286353
match self.kind {
@@ -300,6 +367,7 @@ impl Entity for Body {
300367
Kind::Empty => Some(0)
301368
}
302369
}
370+
303371
}
304372

305373
impl fmt::Debug for Body {

0 commit comments

Comments
 (0)