diff --git a/Cargo.toml b/Cargo.toml index bbe5a00b1..9b64f93d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,6 @@ members = [ "tests/extern_path/my_application", "tests/integration_tests" ] + +[patch.crates-io] +hyper = { git = "https://github.com/hawkw/hyper", branch = "eliza/bump-tokio" } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 6edecbed6..dbdcd268b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -140,11 +140,11 @@ path = "src/hyper_warp_multiplex/server.rs" [dependencies] tonic = { path = "../tonic", features = ["tls"] } -prost = "0.6" -tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } +prost = { git = "https://github.com/danburkert/prost" } +tokio = { version = "0.3", features = ["rt-multi-thread", "time", "stream", "fs", "macros", "net"] } futures = { version = "0.3", default-features = false, features = ["alloc"] } async-stream = "0.2" -tower = "0.3" +tower = { git = "https://github.com/tower-rs/tower", version = "0.4" } # Required for routeguide serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -155,16 +155,16 @@ tracing-subscriber = { version = "0.2", features = ["tracing-log"] } tracing-attributes = "0.1" tracing-futures = "0.2" # Required for wellknown types -prost-types = "0.6" +prost-types = { git = "https://github.com/danburkert/prost" } # Hyper example -hyper = "0.13" +hyper = { git = "https://github.com/hyperium/hyper", branch = "master" } warp = { version = "0.2", default-features = false } http = "0.2" -http-body = "0.3" +http-body = { git = "https://github.com/hyperium/http-body", branch = "master" } pin-project = "0.4.17" # Health example tonic-health = { path = "../tonic-health" } listenfd = "0.3" [build-dependencies] -tonic-build = { path = "../tonic-build", features = ["prost"] } +tonic-build = { path = "../tonic-build", features = ["prost"] } \ No newline at end of file diff --git a/examples/src/autoreload/server.rs b/examples/src/autoreload/server.rs index a4b713a57..b916fa5d3 100644 --- a/examples/src/autoreload/server.rs +++ b/examples/src/autoreload/server.rs @@ -36,9 +36,9 @@ async fn main() -> Result<(), Box> { match listenfd::ListenFd::from_env().take_tcp_listener(0)? { Some(listener) => { - let mut listener = tokio::net::TcpListener::from_std(listener)?; + let listener = tokio::net::TcpListener::from_std(listener)?; - server.serve_with_incoming(listener.incoming()).await?; + server.serve_with_incoming(listener).await?; } None => { server.serve(addr).await?; diff --git a/examples/src/blocking/client.rs b/examples/src/blocking/client.rs index 5326e1925..3602d83d4 100644 --- a/examples/src/blocking/client.rs +++ b/examples/src/blocking/client.rs @@ -24,11 +24,7 @@ impl BlockingClient { D: std::convert::TryInto, D::Error: Into, { - let mut rt = Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_all().build().unwrap(); let client = rt.block_on(GreeterClient::connect(dst))?; Ok(Self { rt, client }) diff --git a/examples/src/dynamic_load_balance/client.rs b/examples/src/dynamic_load_balance/client.rs index 4ffa6a66d..4c8a547e7 100644 --- a/examples/src/dynamic_load_balance/client.rs +++ b/examples/src/dynamic_load_balance/client.rs @@ -18,42 +18,42 @@ async fn main() -> Result<(), Box> { let e1 = Endpoint::from_static("http://[::1]:50051"); let e2 = Endpoint::from_static("http://[::1]:50052"); - let (channel, mut rx) = Channel::balance_channel(10); + let (channel, rx) = Channel::balance_channel(10); let mut client = EchoClient::new(channel); let done = Arc::new(AtomicBool::new(false)); let demo_done = done.clone(); tokio::spawn(async move { - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Added first endpoint"); let change = Change::Insert("1", e1); let res = rx.send(change).await; println!("{:?}", res); - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Added second endpoint"); let change = Change::Insert("2", e2); let res = rx.send(change).await; println!("{:?}", res); - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Removed first endpoint"); let change = Change::Remove("1"); let res = rx.send(change).await; println!("{:?}", res); - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Removed second endpoint"); let change = Change::Remove("2"); let res = rx.send(change).await; println!("{:?}", res); - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Added third endpoint"); let e3 = Endpoint::from_static("http://[::1]:50051"); let change = Change::Insert("3", e3); let res = rx.send(change).await; println!("{:?}", res); - tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Removed third endpoint"); let change = Change::Remove("3"); let res = rx.send(change).await; @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box> { }); while !done.load(SeqCst) { - tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; let request = tonic::Request::new(EchoRequest { message: "hello".into(), }); diff --git a/examples/src/health/server.rs b/examples/src/health/server.rs index 625272dd6..e77858061 100644 --- a/examples/src/health/server.rs +++ b/examples/src/health/server.rs @@ -3,7 +3,7 @@ use tonic::{transport::Server, Request, Response, Status}; use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use std::time::Duration; -use tokio::time::delay_for; +use tokio::time::sleep; use tonic_health::server::HealthReporter; pub mod hello_world { @@ -34,7 +34,7 @@ async fn twiddle_service_status(mut reporter: HealthReporter) { let mut iter = 0u64; loop { iter += 1; - delay_for(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; if iter % 2 == 0 { reporter.set_serving::>().await; diff --git a/examples/src/hyper_warp/server.rs b/examples/src/hyper_warp/server.rs index a49da382f..f43da77e6 100644 --- a/examples/src/hyper_warp/server.rs +++ b/examples/src/hyper_warp/server.rs @@ -3,6 +3,9 @@ //! To hit the warp server you can run this command: //! `curl localhost:50051/hello` +// TODO(eliza): remove when this works again +#![allow(unused_imports, dead_code)] + use futures::future::{self, Either, TryFutureExt}; use http::version::Version; use hyper::{service::make_service_fn, Server}; @@ -42,37 +45,38 @@ impl Greeter for MyGreeter { #[tokio::main] async fn main() -> Result<(), Box> { - let addr = "[::1]:50051".parse().unwrap(); - let greeter = MyGreeter::default(); - - println!("GreeterServer listening on {}", addr); - - let tonic = GreeterServer::new(greeter); - let mut warp = warp::service(warp::path("hello").map(|| "hello, world!")); - - Server::bind(&addr) - .serve(make_service_fn(move |_| { - let mut tonic = tonic.clone(); - future::ok::<_, Infallible>(tower::service_fn( - move |req: hyper::Request| match req.version() { - Version::HTTP_11 | Version::HTTP_10 => Either::Left( - warp.call(req) - .map_ok(|res| res.map(EitherBody::Left)) - .map_err(Error::from), - ), - Version::HTTP_2 => Either::Right( - tonic - .call(req) - .map_ok(|res| res.map(EitherBody::Right)) - .map_err(Error::from), - ), - _ => unimplemented!(), - }, - )) - })) - .await?; - - Ok(()) + todo!("bring back this example when Warp has been updated to use the latest Hyper"); + // let addr = "[::1]:50051".parse().unwrap(); + // let greeter = MyGreeter::default(); + + // println!("GreeterServer listening on {}", addr); + + // let tonic = GreeterServer::new(greeter); + // let mut warp = warp::service(warp::path("hello").map(|| "hello, world!")); + + // Server::bind(&addr) + // .serve(make_service_fn(move |_| { + // let mut tonic = tonic.clone(); + // future::ok::<_, Infallible>(tower::service_fn( + // move |req: hyper::Request| match req.version() { + // Version::HTTP_11 | Version::HTTP_10 => Either::Left( + // warp.call(req) + // .map_ok(|res| res.map(EitherBody::Left)) + // .map_err(Error::from), + // ), + // Version::HTTP_2 => Either::Right( + // tonic + // .call(req) + // .map_ok(|res| res.map(EitherBody::Right)) + // .map_err(Error::from), + // ), + // _ => unimplemented!(), + // }, + // )) + // })) + // .await?; + + // Ok(())` } enum EitherBody { diff --git a/examples/src/hyper_warp_multiplex/server.rs b/examples/src/hyper_warp_multiplex/server.rs index 1e251c4a1..d1d467756 100644 --- a/examples/src/hyper_warp_multiplex/server.rs +++ b/examples/src/hyper_warp_multiplex/server.rs @@ -3,6 +3,9 @@ //! To hit the warp server you can run this command: //! `curl localhost:50051/hello` +// TODO(eliza): remove when this works again +#![allow(unused_imports, dead_code)] + use futures::future::{self, Either, TryFutureExt}; use futures::Stream; use http::version::Version; @@ -94,40 +97,41 @@ impl Echo for MyEcho { #[tokio::main] async fn main() -> Result<(), Box> { - let addr = "[::1]:50051".parse().unwrap(); - - let mut warp = warp::service(warp::path("hello").map(|| "hello, world!")); - - Server::bind(&addr) - .serve(make_service_fn(move |_| { - let greeter = GreeterServer::new(MyGreeter::default()); - let echo = EchoServer::new(MyEcho::default()); - - let mut tonic = TonicServer::builder() - .add_service(greeter) - .add_service(echo) - .into_service(); - - future::ok::<_, Infallible>(tower::service_fn( - move |req: hyper::Request| match req.version() { - Version::HTTP_11 | Version::HTTP_10 => Either::Left( - warp.call(req) - .map_ok(|res| res.map(EitherBody::Left)) - .map_err(Error::from), - ), - Version::HTTP_2 => Either::Right( - tonic - .call(req) - .map_ok(|res| res.map(EitherBody::Right)) - .map_err(Error::from), - ), - _ => unimplemented!(), - }, - )) - })) - .await?; - - Ok(()) + todo!("bring this example back when Warp has been updated to the latest Hyper"); + // let addr = "[::1]:50051".parse().unwrap(); + + // let mut warp = warp::service(warp::path("hello").map(|| "hello, world!")); + + // Server::bind(&addr) + // .serve(make_service_fn(move |_| { + // let greeter = GreeterServer::new(MyGreeter::default()); + // let echo = EchoServer::new(MyEcho::default()); + + // let mut tonic = TonicServer::builder() + // .add_service(greeter) + // .add_service(echo) + // .into_service(); + + // future::ok::<_, Infallible>(tower::service_fn( + // move |req: hyper::Request| match req.version() { + // Version::HTTP_11 | Version::HTTP_10 => Either::Left( + // warp.call(req) + // .map_ok(|res| res.map(EitherBody::Left)) + // .map_err(Error::from), + // ), + // Version::HTTP_2 => Either::Right( + // tonic + // .call(req) + // .map_ok(|res| res.map(EitherBody::Right)) + // .map_err(Error::from), + // ), + // _ => unimplemented!(), + // }, + // )) + // })) + // .await?; + + // Ok(()) } enum EitherBody { diff --git a/examples/src/routeguide/server.rs b/examples/src/routeguide/server.rs index 2f1c50e10..113dce9e9 100644 --- a/examples/src/routeguide/server.rs +++ b/examples/src/routeguide/server.rs @@ -44,7 +44,7 @@ impl RouteGuide for RouteGuideService { ) -> Result, Status> { println!("ListFeatures = {:?}", request); - let (mut tx, rx) = mpsc::channel(4); + let (tx, rx) = mpsc::channel(4); let features = self.features.clone(); tokio::spawn(async move { diff --git a/examples/src/uds/server.rs b/examples/src/uds/server.rs index b744d723f..62ef20327 100644 --- a/examples/src/uds/server.rs +++ b/examples/src/uds/server.rs @@ -40,13 +40,13 @@ async fn main() -> Result<(), Box> { tokio::fs::create_dir_all(Path::new(path).parent().unwrap()).await?; - let mut uds = UnixListener::bind(path)?; + let uds = UnixListener::bind(path)?; let greeter = MyGreeter::default(); Server::builder() .add_service(GreeterServer::new(greeter)) - .serve_with_incoming(uds.incoming().map_ok(unix::UnixStream)) + .serve_with_incoming(uds.map_ok(unix::UnixStream)) .await?; Ok(()) @@ -59,7 +59,7 @@ mod unix { task::{Context, Poll}, }; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::transport::server::Connected; #[derive(Debug)] @@ -71,8 +71,8 @@ mod unix { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } diff --git a/interop/Cargo.toml b/interop/Cargo.toml index 58dc3b641..dc7b2737e 100644 --- a/interop/Cargo.toml +++ b/interop/Cargo.toml @@ -15,18 +15,18 @@ name = "server" path = "src/bin/server.rs" [dependencies] -tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "fs"] } +tokio = { version = "0.3", features = ["rt-multi-thread", "time", "macros", "stream", "fs"] } tonic = { path = "../tonic", features = ["tls"] } -prost = "0.6" -prost-derive = "0.6" -bytes = "0.5" +prost = { git = "https://github.com/danburkert/prost" } +prost-derive = { git = "https://github.com/danburkert/prost" } +bytes = "0.6" http = "0.2" futures-core = "0.3" futures-util = "0.3" async-stream = "0.2" -tower = "0.3" -http-body = "0.3" -hyper = "0.13" +tower = { git = "https://github.com/tower-rs/tower", version = "0.4" } +http-body = { git = "https://github.com/hyperium/http-body", branch = "master" } +hyper = { git = "https://github.com/hyperium/hyper", branch = "master", features = ["stream"] } console = "0.9" structopt = "0.3" tracing = "0.1" diff --git a/interop/src/server.rs b/interop/src/server.rs index 154ed507e..7a0ccf8e5 100644 --- a/interop/src/server.rs +++ b/interop/src/server.rs @@ -2,7 +2,7 @@ use crate::pb::{self, *}; use async_stream::try_stream; use futures_util::{stream, StreamExt, TryStreamExt}; use http::header::{HeaderMap, HeaderName, HeaderValue}; -use http_body::Body; +use hyper::body::HttpBody; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -72,7 +72,7 @@ impl pb::test_service_server::TestService for TestService { let stream = try_stream! { for param in response_parameters { - tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await; + tokio::time::sleep(Duration::from_micros(param.interval_us as u64)).await; let payload = crate::server_payload(param.size as usize); yield StreamingOutputCallResponse { payload: Some(payload) }; @@ -127,7 +127,7 @@ impl pb::test_service_server::TestService for TestService { } for param in msg.response_parameters { - tokio::time::delay_for(Duration::from_micros(param.interval_us as u64)).await; + tokio::time::sleep(Duration::from_micros(param.interval_us as u64)).await; let payload = crate::server_payload(param.size as usize); yield StreamingOutputCallResponse { payload: Some(payload) }; @@ -235,7 +235,7 @@ impl MergeTrailers { } } -impl Body for MergeTrailers { +impl HttpBody for MergeTrailers { type Data = B::Data; type Error = B::Error; diff --git a/tests/ambiguous_methods/Cargo.toml b/tests/ambiguous_methods/Cargo.toml index ecfb3778b..d4de254ec 100644 --- a/tests/ambiguous_methods/Cargo.toml +++ b/tests/ambiguous_methods/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" [dependencies] tonic = { path= "../../tonic" } -prost = "0.6" +prost = { git = "https://github.com/danburkert/prost" } [build-dependencies] tonic-build = { path= "../../tonic-build" } diff --git a/tests/extern_path/my_application/Cargo.toml b/tests/extern_path/my_application/Cargo.toml index 801a82919..06fa09450 100644 --- a/tests/extern_path/my_application/Cargo.toml +++ b/tests/extern_path/my_application/Cargo.toml @@ -10,8 +10,8 @@ license = "MIT" [dependencies] tonic = { path= "../../../tonic" } -prost = "0.6" -prost-types = "0.6" +prost = { git = "https://github.com/danburkert/prost" } +prost-types = { git = "https://github.com/danburkert/prost" } uuid = { package = "uuid1", path= "../uuid" } [build-dependencies] diff --git a/tests/extern_path/uuid/Cargo.toml b/tests/extern_path/uuid/Cargo.toml index 2fdfad3c1..56fd7e199 100644 --- a/tests/extern_path/uuid/Cargo.toml +++ b/tests/extern_path/uuid/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -prost = "0.6" -bytes = "0.5" +prost = { git = "https://github.com/danburkert/prost" } +bytes = "0.6" [build-dependencies] -prost-build = "0.6" +prost-build = { git = "https://github.com/danburkert/prost" } diff --git a/tests/included_service/Cargo.toml b/tests/included_service/Cargo.toml index f51839de3..afe11a160 100644 --- a/tests/included_service/Cargo.toml +++ b/tests/included_service/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" [dependencies] tonic = { path = "../../tonic" } -prost = "0.6" +prost = { git = "https://github.com/danburkert/prost" } [build-dependencies] tonic-build = { path = "../../tonic-build" } diff --git a/tests/integration_tests/Cargo.toml b/tests/integration_tests/Cargo.toml index fbca00664..ac1c02db1 100644 --- a/tests/integration_tests/Cargo.toml +++ b/tests/integration_tests/Cargo.toml @@ -10,12 +10,12 @@ license = "MIT" [dependencies] tonic = { path = "../../tonic" } -prost = "0.6" +prost = { git = "https://github.com/danburkert/prost" } futures-util = "0.3" -bytes = "0.5" +bytes = "0.6" [dev-dependencies] -tokio = { version = "0.2", features = ["macros", "rt-core", "tcp"] } +tokio = { version = "0.3", features = ["macros", "rt", "net"] } [build-dependencies] tonic-build = { path = "../../tonic-build" } diff --git a/tests/integration_tests/tests/connection.rs b/tests/integration_tests/tests/connection.rs index 4ffc2c3b2..c345a35d9 100644 --- a/tests/integration_tests/tests/connection.rs +++ b/tests/integration_tests/tests/connection.rs @@ -41,14 +41,14 @@ async fn connect_returns_err_via_call_after_connected() { .unwrap(); }); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let mut client = TestClient::connect("http://127.0.0.1:1338").await.unwrap(); // First call should pass, then shutdown the server client.unary_call(Request::new(Input {})).await.unwrap(); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let res = client.unary_call(Request::new(Input {})).await; @@ -81,11 +81,11 @@ async fn connect_lazy_reconnects_after_first_failure() { .unwrap(); }); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; client.unary_call(Request::new(Input {})).await.unwrap(); // The server shut down, third call should fail - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; client.unary_call(Request::new(Input {})).await.unwrap_err(); jh.await.unwrap(); diff --git a/tests/integration_tests/tests/status.rs b/tests/integration_tests/tests/status.rs index d3e69f13e..6725784d1 100644 --- a/tests/integration_tests/tests/status.rs +++ b/tests/integration_tests/tests/status.rs @@ -33,7 +33,7 @@ async fn status_with_details() { .unwrap(); }); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let mut channel = test_client::TestClient::connect("http://127.0.0.1:1337") .await @@ -87,7 +87,7 @@ async fn status_with_metadata() { .unwrap(); }); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let mut channel = test_client::TestClient::connect("http://127.0.0.1:1338") .await diff --git a/tests/integration_tests/tests/user_agent.rs b/tests/integration_tests/tests/user_agent.rs index 12b7b3f66..2d6406df8 100644 --- a/tests/integration_tests/tests/user_agent.rs +++ b/tests/integration_tests/tests/user_agent.rs @@ -33,7 +33,7 @@ async fn writes_user_agent_header() { .unwrap(); }); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let channel = Endpoint::from_static("http://127.0.0.1:1322") .user_agent("my-client") diff --git a/tests/same_name/Cargo.toml b/tests/same_name/Cargo.toml index d166ebc61..41910d3f3 100644 --- a/tests/same_name/Cargo.toml +++ b/tests/same_name/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" [dependencies] tonic = { path = "../../tonic" } -prost = "0.6" +prost = { git = "https://github.com/danburkert/prost" } [build-dependencies] tonic-build = { path = "../../tonic-build" } diff --git a/tests/wellknown/Cargo.toml b/tests/wellknown/Cargo.toml index aab66b99d..3c5ecc20c 100644 --- a/tests/wellknown/Cargo.toml +++ b/tests/wellknown/Cargo.toml @@ -10,8 +10,8 @@ license = "MIT" [dependencies] tonic = { path = "../../tonic" } -prost = "0.6" -prost-types = "0.6" +prost = { git = "https://github.com/danburkert/prost" } +prost-types = { git = "https://github.com/danburkert/prost" } [build-dependencies] tonic-build = { path = "../../tonic-build" } diff --git a/tonic-build/Cargo.toml b/tonic-build/Cargo.toml index 8babc0ee1..3acdc5096 100644 --- a/tonic-build/Cargo.toml +++ b/tonic-build/Cargo.toml @@ -16,7 +16,7 @@ keywords = ["rpc", "grpc", "async", "codegen", "protobuf"] [dependencies] -prost-build = { version = "0.6", optional = true } +prost-build = { git = "https://github.com/danburkert/prost", optional = true } syn = "1.0" quote = "1.0" proc-macro2 = "1.0" diff --git a/tonic-health/Cargo.toml b/tonic-health/Cargo.toml index a27306c78..0625c1c72 100644 --- a/tonic-health/Cargo.toml +++ b/tonic-health/Cargo.toml @@ -19,13 +19,13 @@ transport = ["tonic/transport"] [dependencies] async-stream = "0.2" -tokio = { version = "0.2", features = ["sync", "stream"] } +tokio = { version = "0.3", features = ["sync", "stream"] } tonic = { version = "0.3", path = "../tonic", features = ["codegen", "prost"] } -bytes = "0.5" -prost = "0.6" +bytes = "0.6" +prost = { git = "https://github.com/danburkert/prost" } [dev-dependencies] -tokio = { version = "0.2", features = ["rt-core", "macros"]} +tokio = { version = "0.3", features = ["rt", "macros"]} [build-dependencies] tonic-build = { version = "0.3", path = "../tonic-build" } \ No newline at end of file diff --git a/tonic-health/src/server.rs b/tonic-health/src/server.rs index c2837f704..7752d94e3 100644 --- a/tonic-health/src/server.rs +++ b/tonic-health/src/server.rs @@ -84,8 +84,7 @@ impl HealthReporter { let _ = writer.insert(service_name.to_string(), watch::channel(status)); } Some((tx, rx)) => { - let mut rx = rx.clone(); - if rx.recv().await == Some(status) { + if *rx.borrow() == status { return; } @@ -93,7 +92,7 @@ impl HealthReporter { // receiver should always be present, only being dropped when clearing the // service status. Consequently, `tx.broadcast` should not fail, making use // of `expect` here safe. - tx.broadcast(status).expect("channel should not be closed"); + tx.send(status).expect("channel should not be closed"); } }; } @@ -118,7 +117,7 @@ impl HealthService { let reader = self.statuses.read().await; match reader.get(service_name).map(|p| p.1.clone()) { None => None, - Some(mut receiver) => receiver.recv().await, + Some(receiver) => Some(receiver.borrow().clone()), } } } @@ -154,10 +153,14 @@ impl Health for HealthService { }; let output = async_stream::try_stream! { - while let Some(status) = status_rx.recv().await { - yield HealthCheckResponse{ + loop { + let status = *status_rx.borrow(); + yield HealthCheckResponse { status: crate::proto::health_check_response::ServingStatus::from(status) as i32, }; + if status_rx.changed().await.is_err() { + break; + } } }; diff --git a/tonic-types/Cargo.toml b/tonic-types/Cargo.toml index 7bbf1e4ee..f7f0fef38 100644 --- a/tonic-types/Cargo.toml +++ b/tonic-types/Cargo.toml @@ -15,8 +15,8 @@ categories = ["web-programming", "network-programming", "asynchronous"] keywords = ["rpc", "grpc", "protobuf"] [dependencies] -prost = "0.6" -prost-types = "0.6" +prost = { git = "https://github.com/danburkert/prost" } +prost-types = { git = "https://github.com/danburkert/prost" } [build-dependencies] -prost-build = "0.6" +prost-build = { git = "https://github.com/danburkert/prost" } diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 39b0276cc..330249da0 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -29,8 +29,6 @@ transport = [ "hyper", "tokio", "tower", - "tower-balance", - "tower-load", "tracing-futures", ] tls = ["transport", "tokio-rustls"] @@ -42,7 +40,7 @@ prost = ["prost1", "prost-derive"] # harness = false [dependencies] -bytes = "0.5" +bytes = "0.6" futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } tracing = "0.1" @@ -51,33 +49,35 @@ base64 = "0.12" percent-encoding = "2.0" tower-service = "0.3" -tokio-util = { version = "0.3", features = ["codec"] } +tokio-util = { version = "0.5", features = ["codec"] } async-stream = "0.2" -http-body = "0.3" +http-body = { git = "https://github.com/hyperium/http-body", branch = "master" } pin-project = "0.4.17" # prost -prost1 = { package = "prost", version = "0.6", optional = true } -prost-derive = { version = "0.6", optional = true } +prost1 = { package = "prost", git = "https://github.com/danburkert/prost", optional = true } +prost-derive = { git = "https://github.com/danburkert/prost", optional = true } # codegen async-trait = { version = "0.1.13", optional = true } # transport -hyper = { version = "0.13.4", features = ["stream"], optional = true } -tokio = { version = "0.2.13", features = ["tcp"], optional = true } -tower = { version = "0.3", optional = true} -tower-make = { version = "0.3", features = ["connect"] } -tower-balance = { version = "0.3", optional = true } -tower-load = { version = "0.3", optional = true } +hyper = { git = "https://github.com/hyperium/hyper", branch = "master", features = ["client", "http2", "server", "stream", "runtime"], optional = true } +tokio = { version = "0.3.2", features = ["net", "sync"], optional = true } tracing-futures = { version = "0.2", optional = true } # rustls -tokio-rustls = { version = "0.14", optional = true } +tokio-rustls = { version = "0.20", optional = true } rustls-native-certs = { version = "0.4", optional = true } +[dependencies.tower] +git = "https://github.com/tower-rs/tower" +version = "0.4" +features = ["balance", "load", "make", "buffer", "limit", "util", "timeout"] +optional = true + [dev-dependencies] -tokio = { version = "0.2", features = ["rt-core", "macros"] } +tokio = { version = "0.3", features = ["rt", "macros"] } static_assertions = "1.0" rand = "0.7" bencher = "0.1.5" diff --git a/tonic/benches/decode.rs b/tonic/benches/decode.rs index 529e69377..8bb3ce9fd 100644 --- a/tonic/benches/decode.rs +++ b/tonic/benches/decode.rs @@ -11,8 +11,7 @@ use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming}; macro_rules! bench { ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => { fn $name(b: &mut Bencher) { - let mut rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .expect("runtime"); diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 1714cfbf8..633df216c 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -160,7 +160,7 @@ where Pin::new_unchecked(&mut me.0).poll_data(cx) }; match futures_util::ready!(v) { - Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.to_bytes()))), + Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.copy_to_bytes(i.bytes().len())))), Some(Err(e)) => { let err = Status::map_error(e.into()); Poll::Ready(Some(Err(err))) diff --git a/tonic/src/codec/buffer.rs b/tonic/src/codec/buffer.rs index 808348c9c..cd7ceda74 100644 --- a/tonic/src/codec/buffer.rs +++ b/tonic/src/codec/buffer.rs @@ -1,5 +1,4 @@ -use bytes::{Buf, BufMut, BytesMut}; -use std::mem::MaybeUninit; +use bytes::{buf::UninitSlice, Buf, BufMut, BytesMut}; /// A specialized buffer to decode gRPC messages from. #[derive(Debug)] @@ -63,7 +62,10 @@ impl EncodeBuf<'_> { } } -impl BufMut for EncodeBuf<'_> { +// Safety: this impl simply forwards to the inner `BytesMut`'s BufMut` impl; if +// the impl of `BufMut` for `BytesMut` is safe, then this implementation is as +// well. +unsafe impl BufMut for EncodeBuf<'_> { #[inline] fn remaining_mut(&self) -> usize { self.buf.remaining_mut() @@ -75,7 +77,7 @@ impl BufMut for EncodeBuf<'_> { } #[inline] - fn bytes_mut(&mut self) -> &mut [MaybeUninit] { + fn bytes_mut(&mut self) -> &mut UninitSlice { self.buf.bytes_mut() } } @@ -102,7 +104,7 @@ mod tests { assert_eq!(buf.remaining(), 5); assert_eq!(buf.bytes().len(), 5); - assert_eq!(buf.to_bytes().len(), 5); + assert_eq!(buf.copy_to_bytes(5).len(), 5); assert!(!buf.has_remaining()); } diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 788fa2e18..13eb4b986 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -15,7 +15,7 @@ use std::{ fmt, time::Duration, }; -use tower_make::MakeConnection; +use tower::make::MakeConnection; /// Channel builder. /// diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index be2c122dd..a0d38d06a 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -30,12 +30,12 @@ use tokio::{ }; use tower::{ + balance::p2c::Balance, buffer::{self, Buffer}, discover::{Change, Discover}, util::{BoxService, Either}, Service, }; -use tower_balance::p2c::Balance; type Svc = Either, Response, crate::Error>>; @@ -109,7 +109,7 @@ impl Channel { /// This creates a [`Channel`] that will load balance accross all the /// provided endpoints. pub fn balance_list(list: impl Iterator) -> Self { - let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); + let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); list.for_each(|endpoint| { tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)) .unwrap(); @@ -166,9 +166,9 @@ impl Channel { where D: Discover + Unpin + Send + 'static, D::Error: Into, - D::Key: Send + Clone, + D::Key: Hash + Send + Clone, { - let svc = Balance::from_entropy(discover); + let svc = Balance::new(discover); let svc = BoxService::new(svc); let svc = Buffer::new(Either::B(svc), buffer_size); diff --git a/tonic/src/transport/server/incoming.rs b/tonic/src/transport/server/incoming.rs index e33c77eee..a1df5a942 100644 --- a/tonic/src/transport/server/incoming.rs +++ b/tonic/src/transport/server/incoming.rs @@ -12,7 +12,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[cfg_attr(not(feature = "tls"), allow(unused_variables))] pub(crate) fn tcp_incoming( @@ -105,8 +105,8 @@ where fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { use std::future::Future; let pin = self.get_mut(); diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index 52e82ab80..66d5f50c2 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -13,11 +13,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tower::{ layer::Layer, limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer}, + load::Load, timeout::TimeoutLayer, util::BoxService, ServiceBuilder, ServiceExt, }; -use tower_load::Load; use tower_service::Service; pub(crate) type Request = http::Request; diff --git a/tonic/src/transport/service/connector.rs b/tonic/src/transport/service/connector.rs index 25d6a19fb..8bd82dea1 100644 --- a/tonic/src/transport/service/connector.rs +++ b/tonic/src/transport/service/connector.rs @@ -4,7 +4,7 @@ use super::io::BoxedIo; use super::tls::TlsConnector; use http::Uri; use std::task::{Context, Poll}; -use tower_make::MakeConnection; +use tower::make::MakeConnection; use tower_service::Service; #[cfg(not(feature = "tls"))] diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index 925525c30..91d9bd6e1 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -10,7 +10,7 @@ use std::{ }; use tokio::{stream::Stream, sync::mpsc::Receiver}; -use tower::discover::{Change, Discover}; +use tower::discover::Change; type DiscoverResult = Result, E>; @@ -28,22 +28,17 @@ impl DynamicServiceStream { } } -impl Discover for DynamicServiceStream { - type Key = K; - type Service = Connection; - type Error = crate::Error; +impl Stream for DynamicServiceStream { + type Item = DiscoverResult; - fn poll_discover( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let Some((key, connecting)) = &mut self.connecting { let svc = futures_core::ready!(Pin::new(connecting).poll(cx))?; let key = key.to_owned(); self.connecting = None; let change = Ok(Change::Insert(key, svc)); - return Poll::Ready(change); + return Poll::Ready(Some(change)); }; let c = &mut self.changes; @@ -67,7 +62,7 @@ impl Discover for DynamicServiceStream { self.connecting = Some((k, Box::pin(fut))); continue; } - Change::Remove(k) => return Poll::Ready(Ok(Change::Remove(k))), + Change::Remove(k) => return Poll::Ready(Some(Ok(Change::Remove(k)))), }, } } diff --git a/tonic/src/transport/service/io.rs b/tonic/src/transport/service/io.rs index 98961507d..761c8ece9 100644 --- a/tonic/src/transport/service/io.rs +++ b/tonic/src/transport/service/io.rs @@ -4,7 +4,7 @@ use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub(in crate::transport) trait Io: AsyncRead + AsyncWrite + Send + 'static @@ -33,8 +33,8 @@ impl AsyncRead for BoxedIo { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } @@ -83,8 +83,8 @@ impl AsyncRead for ServerIo { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } diff --git a/tonic/src/transport/service/layer.rs b/tonic/src/transport/service/layer.rs index 765f3b917..b33d00588 100644 --- a/tonic/src/transport/service/layer.rs +++ b/tonic/src/transport/service/layer.rs @@ -1,5 +1,5 @@ use tower::{ - layer::{Layer, Stack}, + layer::{util::Stack, Layer}, util::Either, ServiceBuilder, }; diff --git a/tonic/src/transport/service/reconnect.rs b/tonic/src/transport/service/reconnect.rs index c123d967e..a3d3b34c7 100644 --- a/tonic/src/transport/service/reconnect.rs +++ b/tonic/src/transport/service/reconnect.rs @@ -6,7 +6,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tower_make::MakeService; +use tower::make::MakeService; use tower_service::Service; use tracing::trace; @@ -203,7 +203,7 @@ where match me.inner.project() { InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into), InnerProj::Error(e) => { - let e = e.take().expect("Polled after ready.").into(); + let e = Option::take(e).expect("Polled after ready.").into(); Poll::Ready(Err(e)) } }