Skip to content

Commit a19eb34

Browse files
messenseNemo157paolobarbolini
authored
Update to tokio 1.0, bytes 1.0 (#1076)
Co-authored-by: Wim Looman <[email protected]> Co-authored-by: Paolo Barbolini <[email protected]>
1 parent 5ee4fe5 commit a19eb34

File tree

16 files changed

+173
-219
lines changed

16 files changed

+173
-219
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ jobs:
191191

192192
strategy:
193193
matrix:
194-
rust: [1.39.0]
194+
rust: [1.45.2]
195195

196196
steps:
197197
- name: Checkout

Cargo.toml

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ default = ["default-tls"]
2828

2929
# Note: this doesn't enable the 'native-tls' feature, which adds specific
3030
# functionality for it.
31-
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-tls"]
31+
default-tls = ["hyper-tls", "native-tls-crate", "__tls", "tokio-native-tls"]
3232

3333
# Enables native-tls specific functionality not available by default.
3434
native-tls = ["default-tls"]
@@ -39,13 +39,13 @@ rustls-tls-manual-roots = ["__rustls"]
3939
rustls-tls-webpki-roots = ["webpki-roots", "__rustls"]
4040
rustls-tls-native-roots = ["rustls-native-certs", "__rustls"]
4141

42-
blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"]
42+
blocking = ["futures-util/io", "tokio/rt-multi-thread", "tokio/sync"]
4343

4444
cookies = ["cookie_crate", "cookie_store", "time"]
4545

46-
gzip = ["async-compression", "async-compression/gzip"]
46+
gzip = ["async-compression", "async-compression/gzip", "tokio-util"]
4747

48-
brotli = ["async-compression", "async-compression/brotli"]
48+
brotli = ["async-compression", "async-compression/brotli", "tokio-util"]
4949

5050
json = ["serde_json"]
5151

@@ -71,7 +71,7 @@ __internal_proxy_sys_no_cache = []
7171
[dependencies]
7272
http = "0.2"
7373
url = "2.2"
74-
bytes = "0.5"
74+
bytes = "1.0"
7575
serde = "1.0"
7676
serde_urlencoded = "0.7"
7777
mime_guess = "2.0"
@@ -83,53 +83,53 @@ base64 = "0.13"
8383
encoding_rs = "0.8"
8484
futures-core = { version = "0.3.0", default-features = false }
8585
futures-util = { version = "0.3.0", default-features = false }
86-
http-body = "0.3.0"
87-
hyper = { version = "0.13.4", default-features = false, features = ["tcp"] }
86+
http-body = "0.4.0"
87+
hyper = { version = "0.14", default-features = false, features = ["tcp", "http1", "http2", "client"] }
8888
lazy_static = "1.4"
8989
log = "0.4"
9090
mime = "0.3.7"
9191
percent-encoding = "2.1"
92-
tokio = { version = "0.2.5", default-features = false, features = ["tcp", "time"] }
92+
tokio = { version = "1.0", default-features = false, features = ["net", "time"] }
9393
pin-project-lite = "0.2.0"
9494
ipnet = "2.3"
9595

9696
# Optional deps...
9797

9898
## default-tls
99-
hyper-tls = { version = "0.4", optional = true }
99+
hyper-tls = { version = "0.5", optional = true }
100100
native-tls-crate = { version = "0.2", optional = true, package = "native-tls" }
101-
tokio-tls = { version = "0.3.0", optional = true }
101+
tokio-native-tls = { version = "0.3.0", optional = true }
102102

103103
# rustls-tls
104-
hyper-rustls = { version = "0.21", default-features = false, optional = true }
105-
rustls = { version = "0.18", features = ["dangerous_configuration"], optional = true }
106-
tokio-rustls = { version = "0.14", optional = true }
107-
webpki-roots = { version = "0.20", optional = true }
108-
rustls-native-certs = { version = "0.4", optional = true }
104+
hyper-rustls = { version = "0.22.1", default-features = false, optional = true }
105+
rustls = { version = "0.19", features = ["dangerous_configuration"], optional = true }
106+
tokio-rustls = { version = "0.22", optional = true }
107+
webpki-roots = { version = "0.21", optional = true }
108+
rustls-native-certs = { version = "0.5", optional = true }
109109

110110
## cookies
111111
cookie_crate = { version = "0.14", package = "cookie", optional = true }
112112
cookie_store = { version = "0.12", optional = true }
113113
time = { version = "0.2.11", optional = true }
114114

115115
## compression
116-
async-compression = { version = "0.3.0", default-features = false, features = ["stream"], optional = true }
117-
116+
async-compression = { version = "0.3.7", default-features = false, features = ["tokio"], optional = true }
117+
tokio-util = { version = "0.6.0", default-features = false, features = ["codec", "io"], optional = true }
118118

119119
## socks
120-
tokio-socks = { version = "0.3", optional = true }
120+
tokio-socks = { version = "0.5", optional = true }
121121

122122
## trust-dns
123-
trust-dns-resolver = { version = "0.19", optional = true }
123+
trust-dns-resolver = { version = "0.20", optional = true }
124124

125125
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
126-
env_logger = "0.7"
127-
hyper = { version = "0.13", default-features = false, features = ["tcp", "stream"] }
126+
env_logger = "0.8"
127+
hyper = { version = "0.14", default-features = false, features = ["tcp", "stream", "http1", "http2", "client", "server"] }
128128
serde = { version = "1.0", features = ["derive"] }
129129
libflate = "1.0"
130130
brotli_crate = { package = "brotli", version = "3.3.0" }
131131
doc-comment = "0.3"
132-
tokio = { version = "0.2.0", default-features = false, features = ["macros"] }
132+
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }
133133

134134
[target.'cfg(windows)'.dependencies]
135135
winreg = "0.7"

src/async_impl/body.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use bytes::Bytes;
77
use futures_core::Stream;
88
use http_body::Body as HttpBody;
99
use pin_project_lite::pin_project;
10-
use tokio::time::Delay;
10+
use tokio::time::Sleep;
1111

1212
/// An asynchronous request body.
1313
pub struct Body {
@@ -27,7 +27,7 @@ enum Inner {
2727
+ Sync,
2828
>,
2929
>,
30-
timeout: Option<Delay>,
30+
timeout: Option<Pin<Box<Sleep>>>,
3131
},
3232
}
3333

@@ -103,7 +103,7 @@ impl Body {
103103
}
104104
}
105105

106-
pub(crate) fn response(body: hyper::Body, timeout: Option<Delay>) -> Body {
106+
pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
107107
Body {
108108
inner: Inner::Streaming {
109109
body: Box::pin(WrapHyper(body)),
@@ -217,7 +217,7 @@ impl HttpBody for ImplStream {
217217
ref mut timeout,
218218
} => {
219219
if let Some(ref mut timeout) = timeout {
220-
if let Poll::Ready(()) = Pin::new(timeout).poll(cx) {
220+
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
221221
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
222222
}
223223
}

src/async_impl/client.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use rustls::RootCertStore;
2626
use std::future::Future;
2727
use std::pin::Pin;
2828
use std::task::{Context, Poll};
29-
use tokio::time::Delay;
29+
use tokio::time::Sleep;
3030
use pin_project_lite::pin_project;
3131

3232
use log::debug;
@@ -96,7 +96,6 @@ struct Config {
9696
#[cfg(feature = "__tls")]
9797
tls: TlsBackend,
9898
http2_only: bool,
99-
http1_writev: Option<bool>,
10099
http1_title_case_headers: bool,
101100
http2_initial_stream_window_size: Option<u32>,
102101
http2_initial_connection_window_size: Option<u32>,
@@ -151,7 +150,6 @@ impl ClientBuilder {
151150
#[cfg(feature = "__tls")]
152151
tls: TlsBackend::default(),
153152
http2_only: false,
154-
http1_writev: None,
155153
http1_title_case_headers: false,
156154
http2_initial_stream_window_size: None,
157155
http2_initial_connection_window_size: None,
@@ -316,10 +314,6 @@ impl ClientBuilder {
316314
builder.http2_only(true);
317315
}
318316

319-
if let Some(http1_writev) = config.http1_writev {
320-
builder.http1_writev(http1_writev);
321-
}
322-
323317
if let Some(http2_initial_stream_window_size) = config.http2_initial_stream_window_size {
324318
builder.http2_initial_stream_window_size(http2_initial_stream_window_size);
325319
}
@@ -655,14 +649,6 @@ impl ClientBuilder {
655649
self
656650
}
657651

658-
/// Force hyper to use either queued(if true), or flattened(if false) write strategy
659-
/// This may eliminate unnecessary cloning of buffers for some TLS backends
660-
/// By default hyper will try to guess which strategy to use
661-
pub fn http1_writev(mut self, writev: bool) -> ClientBuilder {
662-
self.config.http1_writev = Some(writev);
663-
self
664-
}
665-
666652
/// Only use HTTP/2.
667653
pub fn http2_prior_knowledge(mut self) -> ClientBuilder {
668654
self.config.http2_only = true;
@@ -1103,7 +1089,8 @@ impl Client {
11031089

11041090
let timeout = timeout
11051091
.or(self.inner.request_timeout)
1106-
.map(tokio::time::delay_for);
1092+
.map(tokio::time::sleep)
1093+
.map(Box::pin);
11071094

11081095
*req.headers_mut() = headers.clone();
11091096

@@ -1317,7 +1304,7 @@ pin_project! {
13171304
#[pin]
13181305
in_flight: ResponseFuture,
13191306
#[pin]
1320-
timeout: Option<Delay>,
1307+
timeout: Option<Pin<Box<Sleep>>>,
13211308
}
13221309
}
13231310

@@ -1326,7 +1313,7 @@ impl PendingRequest {
13261313
self.project().in_flight
13271314
}
13281315

1329-
fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Delay>> {
1316+
fn timeout(self: Pin<&mut Self>) -> Pin<&mut Option<Pin<Box<Sleep>>>> {
13301317
self.project().timeout
13311318
}
13321319

src/async_impl/decoder.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@ use std::pin::Pin;
44
use std::task::{Context, Poll};
55

66
#[cfg(feature = "gzip")]
7-
use async_compression::stream::GzipDecoder;
7+
use async_compression::tokio::bufread::GzipDecoder;
88

99
#[cfg(feature = "brotli")]
10-
use async_compression::stream::BrotliDecoder;
10+
use async_compression::tokio::bufread::BrotliDecoder;
1111

1212
use bytes::Bytes;
1313
use futures_core::Stream;
1414
use futures_util::stream::Peekable;
1515
use http::HeaderMap;
1616
use hyper::body::HttpBody;
1717

18+
#[cfg(any(feature = "gzip", feature = "brotli"))]
19+
use tokio_util::io::StreamReader;
20+
#[cfg(any(feature = "gzip", feature = "brotli"))]
21+
use tokio_util::codec::{BytesCodec, FramedRead};
22+
1823
use super::super::Body;
1924
use crate::error;
2025

@@ -39,11 +44,11 @@ enum Inner {
3944

4045
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
4146
#[cfg(feature = "gzip")]
42-
Gzip(GzipDecoder<Peekable<IoStream>>),
47+
Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
4348

4449
/// A `Brotli` decoder will uncompress the brotlied response content before returning it.
4550
#[cfg(feature = "brotli")]
46-
Brotli(BrotliDecoder<Peekable<IoStream>>),
51+
Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
4752

4853
/// A decoder that doesn't have a value yet.
4954
#[cfg(any(feature = "brotli", feature = "gzip"))]
@@ -229,15 +234,15 @@ impl Stream for Decoder {
229234
#[cfg(feature = "gzip")]
230235
Inner::Gzip(ref mut decoder) => {
231236
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
232-
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
237+
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
233238
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
234239
None => Poll::Ready(None),
235240
};
236241
}
237242
#[cfg(feature = "brotli")]
238243
Inner::Brotli(ref mut decoder) => {
239244
return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
240-
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
245+
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
241246
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
242247
None => Poll::Ready(None),
243248
};
@@ -302,9 +307,9 @@ impl Future for Pending {
302307

303308
match self.1 {
304309
#[cfg(feature = "brotli")]
305-
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))),
310+
DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new(BrotliDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
306311
#[cfg(feature = "gzip")]
307-
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))),
312+
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new(GzipDecoder::new(StreamReader::new(_body)), BytesCodec::new())))),
308313
}
309314
}
310315
}

src/async_impl/multipart.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -521,11 +521,7 @@ mod tests {
521521
fn form_empty() {
522522
let form = Form::new();
523523

524-
let mut rt = runtime::Builder::new()
525-
.basic_scheduler()
526-
.enable_all()
527-
.build()
528-
.expect("new rt");
524+
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
529525
let body = form.stream().into_stream();
530526
let s = body.map_ok(|try_c| try_c.to_vec()).try_concat();
531527

@@ -572,11 +568,7 @@ mod tests {
572568
--boundary\r\n\
573569
Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\
574570
value3\r\n--boundary--\r\n";
575-
let mut rt = runtime::Builder::new()
576-
.basic_scheduler()
577-
.enable_all()
578-
.build()
579-
.expect("new rt");
571+
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
580572
let body = form.stream().into_stream();
581573
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();
582574

@@ -603,11 +595,7 @@ mod tests {
603595
\r\n\
604596
value2\r\n\
605597
--boundary--\r\n";
606-
let mut rt = runtime::Builder::new()
607-
.basic_scheduler()
608-
.enable_all()
609-
.build()
610-
.expect("new rt");
598+
let rt = runtime::Builder::new_current_thread().enable_all().build().expect("new rt");
611599
let body = form.stream().into_stream();
612600
let s = body.map(|try_c| try_c.map(|r| r.to_vec())).try_concat();
613601

src/async_impl/response.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::borrow::Cow;
22
use std::fmt;
33
use std::net::SocketAddr;
4+
use std::pin::Pin;
45

56
use bytes::Bytes;
67
use encoding_rs::{Encoding, UTF_8};
@@ -12,7 +13,7 @@ use mime::Mime;
1213
use serde::de::DeserializeOwned;
1314
#[cfg(feature = "json")]
1415
use serde_json;
15-
use tokio::time::Delay;
16+
use tokio::time::Sleep;
1617
use url::Url;
1718

1819
use super::body::Body;
@@ -37,7 +38,7 @@ impl Response {
3738
res: hyper::Response<hyper::Body>,
3839
url: Url,
3940
accepts: Accepts,
40-
timeout: Option<Delay>,
41+
timeout: Option<Pin<Box<Sleep>>>,
4142
) -> Response {
4243
let (parts, body) = res.into_parts();
4344
let status = parts.status;

src/blocking/body.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ use std::fmt;
22
use std::fs::File;
33
use std::future::Future;
44
use std::io::{self, Cursor, Read};
5-
use std::mem::{self, MaybeUninit};
5+
use std::mem;
66
use std::ptr;
77

88
use bytes::Bytes;
9+
use bytes::buf::UninitSlice;
910

1011
use crate::async_impl;
1112

@@ -289,14 +290,14 @@ async fn send_future(sender: Sender) -> Result<(), crate::Error> {
289290
if buf.remaining_mut() == 0 {
290291
buf.reserve(8192);
291292
// zero out the reserved memory
293+
let uninit = buf.chunk_mut();
292294
unsafe {
293-
let uninit = mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut());
294295
ptr::write_bytes(uninit.as_mut_ptr(), 0, uninit.len());
295296
}
296297
}
297298

298299
let bytes = unsafe {
299-
mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.bytes_mut())
300+
mem::transmute::<&mut UninitSlice, &mut [u8]>(buf.chunk_mut())
300301
};
301302
match body.read(bytes) {
302303
Ok(0) => {

0 commit comments

Comments
 (0)