Skip to content

[WIP] update to Tokio 0.3, latest Hyper #488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ members = [
"tests/extern_path/my_application",
"tests/integration_tests"
]

[patch.crates-io]
hyper = { git = "https://github.com/hawkw/hyper", branch = "eliza/bump-tokio" }
14 changes: 7 additions & 7 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ path = "src/hyper_warp_multiplex/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["tls"] }
prost = "0.6"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
prost = { git = "https://github.com/danburkert/prost" }
tokio = { version = "0.3", features = ["rt-multi-thread", "time", "stream", "fs", "macros", "net"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
async-stream = "0.2"
tower = "0.3"
tower = { git = "https://github.com/tower-rs/tower", version = "0.4" }
# Required for routeguide
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -155,16 +155,16 @@ tracing-subscriber = { version = "0.2", features = ["tracing-log"] }
tracing-attributes = "0.1"
tracing-futures = "0.2"
# Required for wellknown types
prost-types = "0.6"
prost-types = { git = "https://github.com/danburkert/prost" }
# Hyper example
hyper = "0.13"
hyper = { git = "https://github.com/hyperium/hyper", branch = "master" }
warp = { version = "0.2", default-features = false }
http = "0.2"
http-body = "0.3"
http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
pin-project = "0.4.17"
# Health example
tonic-health = { path = "../tonic-health" }
listenfd = "0.3"

[build-dependencies]
tonic-build = { path = "../tonic-build", features = ["prost"] }
tonic-build = { path = "../tonic-build", features = ["prost"] }
4 changes: 2 additions & 2 deletions examples/src/autoreload/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

match listenfd::ListenFd::from_env().take_tcp_listener(0)? {
Some(listener) => {
let mut listener = tokio::net::TcpListener::from_std(listener)?;
let listener = tokio::net::TcpListener::from_std(listener)?;

server.serve_with_incoming(listener.incoming()).await?;
server.serve_with_incoming(listener).await?;
}
None => {
server.serve(addr).await?;
Expand Down
6 changes: 1 addition & 5 deletions examples/src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ impl BlockingClient {
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let rt = Builder::new_current_thread().enable_all().build().unwrap();
let client = rt.block_on(GreeterClient::connect(dst))?;

Ok(Self { rt, client })
Expand Down
16 changes: 8 additions & 8 deletions examples/src/dynamic_load_balance/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let e1 = Endpoint::from_static("http://[::1]:50051");
let e2 = Endpoint::from_static("http://[::1]:50052");

let (channel, mut rx) = Channel::balance_channel(10);
let (channel, rx) = Channel::balance_channel(10);
let mut client = EchoClient::new(channel);

let done = Arc::new(AtomicBool::new(false));
let demo_done = done.clone();
tokio::spawn(async move {
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added first endpoint");
let change = Change::Insert("1", e1);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added second endpoint");
let change = Change::Insert("2", e2);
let res = rx.send(change).await;
println!("{:?}", res);
tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed first endpoint");
let change = Change::Remove("1");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed second endpoint");
let change = Change::Remove("2");
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Added third endpoint");
let e3 = Endpoint::from_static("http://[::1]:50051");
let change = Change::Insert("3", e3);
let res = rx.send(change).await;
println!("{:?}", res);

tokio::time::delay_for(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("Removed third endpoint");
let change = Change::Remove("3");
let res = rx.send(change).await;
Expand All @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

while !done.load(SeqCst) {
tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let request = tonic::Request::new(EchoRequest {
message: "hello".into(),
});
Expand Down
4 changes: 2 additions & 2 deletions examples/src/health/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tonic::{transport::Server, Request, Response, Status};
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use std::time::Duration;
use tokio::time::delay_for;
use tokio::time::sleep;
use tonic_health::server::HealthReporter;

pub mod hello_world {
Expand Down Expand Up @@ -34,7 +34,7 @@ async fn twiddle_service_status(mut reporter: HealthReporter) {
let mut iter = 0u64;
loop {
iter += 1;
delay_for(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

if iter % 2 == 0 {
reporter.set_serving::<GreeterServer<MyGreeter>>().await;
Expand Down
66 changes: 35 additions & 31 deletions examples/src/hyper_warp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! To hit the warp server you can run this command:
//! `curl localhost:50051/hello`

// TODO(eliza): remove when this works again
#![allow(unused_imports, dead_code)]

use futures::future::{self, Either, TryFutureExt};
use http::version::Version;
use hyper::{service::make_service_fn, Server};
Expand Down Expand Up @@ -42,37 +45,38 @@ impl Greeter for MyGreeter {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();

println!("GreeterServer listening on {}", addr);

let tonic = GreeterServer::new(greeter);
let mut warp = warp::service(warp::path("hello").map(|| "hello, world!"));

Server::bind(&addr)
.serve(make_service_fn(move |_| {
let mut tonic = tonic.clone();
future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| match req.version() {
Version::HTTP_11 | Version::HTTP_10 => Either::Left(
warp.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
),
Version::HTTP_2 => Either::Right(
tonic
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
),
_ => unimplemented!(),
},
))
}))
.await?;

Ok(())
todo!("bring back this example when Warp has been updated to use the latest Hyper");
// let addr = "[::1]:50051".parse().unwrap();
// let greeter = MyGreeter::default();

// println!("GreeterServer listening on {}", addr);

// let tonic = GreeterServer::new(greeter);
// let mut warp = warp::service(warp::path("hello").map(|| "hello, world!"));

// Server::bind(&addr)
// .serve(make_service_fn(move |_| {
// let mut tonic = tonic.clone();
// future::ok::<_, Infallible>(tower::service_fn(
// move |req: hyper::Request<hyper::Body>| match req.version() {
// Version::HTTP_11 | Version::HTTP_10 => Either::Left(
// warp.call(req)
// .map_ok(|res| res.map(EitherBody::Left))
// .map_err(Error::from),
// ),
// Version::HTTP_2 => Either::Right(
// tonic
// .call(req)
// .map_ok(|res| res.map(EitherBody::Right))
// .map_err(Error::from),
// ),
// _ => unimplemented!(),
// },
// ))
// }))
// .await?;

// Ok(())`
}

enum EitherBody<A, B> {
Expand Down
72 changes: 38 additions & 34 deletions examples/src/hyper_warp_multiplex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! To hit the warp server you can run this command:
//! `curl localhost:50051/hello`

// TODO(eliza): remove when this works again
#![allow(unused_imports, dead_code)]

use futures::future::{self, Either, TryFutureExt};
use futures::Stream;
use http::version::Version;
Expand Down Expand Up @@ -94,40 +97,41 @@ impl Echo for MyEcho {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();

let mut warp = warp::service(warp::path("hello").map(|| "hello, world!"));

Server::bind(&addr)
.serve(make_service_fn(move |_| {
let greeter = GreeterServer::new(MyGreeter::default());
let echo = EchoServer::new(MyEcho::default());

let mut tonic = TonicServer::builder()
.add_service(greeter)
.add_service(echo)
.into_service();

future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| match req.version() {
Version::HTTP_11 | Version::HTTP_10 => Either::Left(
warp.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
),
Version::HTTP_2 => Either::Right(
tonic
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
),
_ => unimplemented!(),
},
))
}))
.await?;

Ok(())
todo!("bring this example back when Warp has been updated to the latest Hyper");
// let addr = "[::1]:50051".parse().unwrap();

// let mut warp = warp::service(warp::path("hello").map(|| "hello, world!"));

// Server::bind(&addr)
// .serve(make_service_fn(move |_| {
// let greeter = GreeterServer::new(MyGreeter::default());
// let echo = EchoServer::new(MyEcho::default());

// let mut tonic = TonicServer::builder()
// .add_service(greeter)
// .add_service(echo)
// .into_service();

// future::ok::<_, Infallible>(tower::service_fn(
// move |req: hyper::Request<hyper::Body>| match req.version() {
// Version::HTTP_11 | Version::HTTP_10 => Either::Left(
// warp.call(req)
// .map_ok(|res| res.map(EitherBody::Left))
// .map_err(Error::from),
// ),
// Version::HTTP_2 => Either::Right(
// tonic
// .call(req)
// .map_ok(|res| res.map(EitherBody::Right))
// .map_err(Error::from),
// ),
// _ => unimplemented!(),
// },
// ))
// }))
// .await?;

// Ok(())
}

enum EitherBody<A, B> {
Expand Down
2 changes: 1 addition & 1 deletion examples/src/routeguide/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl RouteGuide for RouteGuideService {
) -> Result<Response<Self::ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);

let (mut tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();

tokio::spawn(async move {
Expand Down
10 changes: 5 additions & 5 deletions examples/src/uds/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

tokio::fs::create_dir_all(Path::new(path).parent().unwrap()).await?;

let mut uds = UnixListener::bind(path)?;
let uds = UnixListener::bind(path)?;

let greeter = MyGreeter::default();

Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_incoming(uds.incoming().map_ok(unix::UnixStream))
.serve_with_incoming(uds.map_ok(unix::UnixStream))
.await?;

Ok(())
Expand All @@ -59,7 +59,7 @@ mod unix {
task::{Context, Poll},
};

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::Connected;

#[derive(Debug)]
Expand All @@ -71,8 +71,8 @@ mod unix {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
Expand Down
14 changes: 7 additions & 7 deletions interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ name = "server"
path = "src/bin/server.rs"

[dependencies]
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "stream", "fs"] }
tokio = { version = "0.3", features = ["rt-multi-thread", "time", "macros", "stream", "fs"] }
tonic = { path = "../tonic", features = ["tls"] }
prost = "0.6"
prost-derive = "0.6"
bytes = "0.5"
prost = { git = "https://github.com/danburkert/prost" }
prost-derive = { git = "https://github.com/danburkert/prost" }
bytes = "0.6"
http = "0.2"
futures-core = "0.3"
futures-util = "0.3"
async-stream = "0.2"
tower = "0.3"
http-body = "0.3"
hyper = "0.13"
tower = { git = "https://github.com/tower-rs/tower", version = "0.4" }
http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
hyper = { git = "https://github.com/hyperium/hyper", branch = "master", features = ["stream"] }
console = "0.9"
structopt = "0.3"
tracing = "0.1"
Expand Down
Loading