Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ac3e01b
REMOVE ME updates peak_wema test to pass
Michael-J-Ward Jul 23, 2021
3654d78
adds pin_project_lite dependency
Michael-J-Ward Jul 23, 2021
eb01136
uses pin_project_lite for load::Constant
Michael-J-Ward Jul 23, 2021
2d23256
uses pin_project_lite for load::PencingRequestsDiscover
Michael-J-Ward Jul 23, 2021
5517790
uses pin_project_lite for load::PeakEwma
Michael-J-Ward Jul 23, 2021
c83394f
uses pin_project_lite for load::Completion
Michael-J-Ward Jul 23, 2021
d3e066b
uses pin_project_lite for tests::support::IntoStream
Michael-J-Ward Jul 23, 2021
724fefe
refactors opaque_future into a regular struct
Michael-J-Ward Jul 23, 2021
623e71c
migrates opaque_future to use pin_project_lite
Michael-J-Ward Jul 23, 2021
50bb0da
removes tuple variant from load_shed::ResponseState enum
Michael-J-Ward Jul 23, 2021
17e1645
migrates load_shed::future to pin_project_lite
Michael-J-Ward Jul 23, 2021
8c3463d
removes tuple variant from filter::future::State
Michael-J-Ward Jul 23, 2021
a1e4383
migrates filter::future to pin_project_lite
Michael-J-Ward Jul 23, 2021
333c5d5
migrates retry::Retry to pin_project_lite
Michael-J-Ward Jul 23, 2021
dc42c25
refactors retry::future::State to enable pin_project_lite
Michael-J-Ward Jul 23, 2021
c01eeed
migrates retry::future to pin_project_lite
Michael-J-Ward Jul 23, 2021
61d931a
migrates spawn_ready::make to pin_project_lite
Michael-J-Ward Jul 24, 2021
7ba2add
refactors buffer::future::ResponseState to allow pin_project_lite
Michael-J-Ward Jul 24, 2021
2b77259
migrates buffer::future to pin_project_lite
Michael-J-Ward Jul 24, 2021
e4e9728
refactors util::AndThenFuture to allow pin_project_lite
Michael-J-Ward Jul 24, 2021
f3b7d67
migrates util::AndThenFuture to pin_project_lite
Michael-J-Ward Jul 24, 2021
1739fe6
migrates hedge::Future to pin_project_lite
Michael-J-Ward Jul 24, 2021
858a905
migrates hedge::select::ResponseFuture to pin_project_lite
Michael-J-Ward Jul 24, 2021
5df4ad7
refactors hedge::delay enum for pin_project_lite
Michael-J-Ward Jul 24, 2021
f051a49
refactors reconnect::future enum for pin_project_lite
Michael-J-Ward Jul 24, 2021
c7e5bad
refactors oneshot::State enum for pin_project_lite
Michael-J-Ward Jul 24, 2021
53c26a7
migrates util::oneshot to pin_project_lite
Michael-J-Ward Jul 24, 2021
24b9c27
migrates reconnect::future to pin_project_lite
Michael-J-Ward Jul 24, 2021
1ae3c32
migrates hedge::delay to pin_project_lite
Michael-J-Ward Jul 24, 2021
2b18b88
migrates hedge::latency to pin_project_lite
Michael-J-Ward Jul 24, 2021
1b83543
migrates discover::list to pin_project_lite
Michael-J-Ward Jul 24, 2021
d5f04b6
migrates timeout::future to pin_project_lite
Michael-J-Ward Jul 24, 2021
c0e10cd
migrates balance::pool to pin_project_lite
Michael-J-Ward Jul 24, 2021
fc98305
migrates balance::p2c::make to pin_project_lite
Michael-J-Ward Jul 24, 2021
18cef69
migrates balance::p2c::service to pin_project_lite
Michael-J-Ward Jul 24, 2021
94afdc6
migrates call_all::ordered to pin_project_lite
Michael-J-Ward Jul 24, 2021
8bead65
migrates call_all::common to pin_project_lite
Michael-J-Ward Jul 24, 2021
07665fb
migrates call_all::unordered to pin_project_lite
Michael-J-Ward Jul 24, 2021
5bfc2fe
migrates util::optional::future to pin_project_lite
Michael-J-Ward Jul 24, 2021
c81cc9c
migrates limit::concurrency::future to pin_project_lite
Michael-J-Ward Jul 24, 2021
82b8ed7
migrates tower-balance example to pin_project_lite
Michael-J-Ward Jul 24, 2021
41f6ebf
applies cargo fmt
Michael-J-Ward Jul 24, 2021
889e99b
migrates tower-test to pin_project_lite
Michael-J-Ward Jul 24, 2021
628049d
fixes cargo hack check
Michael-J-Ward Jul 27, 2021
c0303e1
fixes lint rename warning on nightly
Michael-J-Ward Jul 27, 2021
ade48a2
migrates buffer::Worker to pin_project_lite
Michael-J-Ward Jul 27, 2021
9ce2aa0
fixes abort_on_drop test
Michael-J-Ward Jul 27, 2021
5c1c39f
applies cargo fmt
Michael-J-Ward Jul 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tower-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Layer traits and extensions.
//!
Expand Down
2 changes: 1 addition & 1 deletion tower-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
rust_2018_idioms,
unreachable_pub
)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Definition of the core `Service` trait to Tower
//!
Expand Down
2 changes: 1 addition & 1 deletion tower-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tokio = { version = "1.0", features = ["sync"] }
tokio-test = "0.4"
tower-layer = { version = "0.3", path = "../tower-layer" }
tower-service = { version = "0.3" }
pin-project = "1"
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1.0", features = ["macros"] }
2 changes: 1 addition & 1 deletion tower-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
#![deny(broken_intra_doc_links)]
#![deny(rustdoc::broken_intra_doc_links)]

//! Mock `Service` that can be used in tests.

Expand Down
15 changes: 8 additions & 7 deletions tower-test/src/mock/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::mock::error::{self, Error};
use futures_util::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;

use std::{
Expand All @@ -11,12 +11,13 @@ use std::{
task::{Context, Poll},
};

/// Future of the `Mock` response.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
pin_project! {
/// Future of the `Mock` response.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
rx: Option<Rx<T>>,
}
}

type Rx<T> = oneshot::Receiver<Result<T, Error>>;
Expand Down
1 change: 1 addition & 0 deletions tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ tokio = { version = "1", optional = true, features = ["sync"] }
tokio-stream = { version = "0.1.0", optional = true }
tokio-util = { version = "0.6.3", default-features = false, optional = true }
tracing = { version = "0.1.2", optional = true }
pin-project-lite = "0.2.7"

[dev-dependencies]
futures = "0.3"
Expand Down
19 changes: 14 additions & 5 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
Expand Down Expand Up @@ -78,8 +78,17 @@ type Error = Box<dyn std::error::Error + Send + Sync>;

type Key = usize;

#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
pin_project! {
struct Disco<S> {
services: Vec<(Key, S)>
}
}

impl<S> Disco<S> {
fn new(services: Vec<(Key, S)>) -> Self {
Self { services }
}
}

impl<S> Stream for Disco<S>
where
Expand All @@ -88,7 +97,7 @@ where
type Item = Result<Change<Key, S>, Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.pop() {
match self.project().services.pop() {
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
Expand All @@ -105,7 +114,7 @@ fn gen_disco() -> impl Discover<
impl Service<Req, Response = Rsp, Error = Error, Future = impl Send> + Send,
>,
> + Send {
Disco(
Disco::new(
MAX_ENDPOINT_LATENCIES
.iter()
.enumerate()
Expand Down
21 changes: 11 additions & 10 deletions tower/src/balance/p2c/make.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
Expand Down Expand Up @@ -29,15 +29,16 @@ pub struct MakeBalance<S, Req> {
_marker: PhantomData<fn(Req)>,
}

/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
pin_project! {
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
}
}

impl<S, Req> MakeBalance<S, Req> {
Expand Down
27 changes: 14 additions & 13 deletions tower/src/balance/p2c/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::load::Load;
use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
Expand Down Expand Up @@ -59,18 +59,19 @@ where
}
}

/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,

_req: PhantomData<Req>,
pin_project! {
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,

_req: PhantomData<Req>,
}
}

enum Error<E> {
Expand Down
37 changes: 19 additions & 18 deletions tower/src/balance/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use slab::Slab;
use std::{
fmt,
Expand All @@ -42,23 +42,24 @@ enum Level {
High,
}

/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
#[pin_project]
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
pin_project! {
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
maker: MS,
#[pin]
making: Option<MS::Future>,
target: Target,
load: Level,
services: Slab<()>,
died_tx: tokio::sync::mpsc::UnboundedSender<usize>,
#[pin]
died_rx: tokio::sync::mpsc::UnboundedReceiver<usize>,
limit: Option<usize>,
}
}

impl<MS, Target, Request> fmt::Debug for PoolDiscoverer<MS, Target, Request>
Expand Down
51 changes: 31 additions & 20 deletions tower/src/buffer/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,50 @@

use super::{error::Closed, message};
use futures_core::ready;
use pin_project::pin_project;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

/// Future that completes when the buffered service eventually services the submitted request.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
pin_project! {
/// Future that completes when the buffered service eventually services the submitted request.
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
state: ResponseState<T>,
}
}

#[pin_project(project = ResponseStateProj)]
#[derive(Debug)]
enum ResponseState<T> {
Failed(Option<crate::BoxError>),
Rx(#[pin] message::Rx<T>),
Poll(#[pin] T),
pin_project! {
#[project = ResponseStateProj]
#[derive(Debug)]
enum ResponseState<T> {
Failed {
error: Option<crate::BoxError>,
},
Rx {
#[pin]
rx: message::Rx<T>,
},
Poll {
#[pin]
fut: T,
},
}
}

impl<T> ResponseFuture<T> {
pub(crate) fn new(rx: message::Rx<T>) -> Self {
ResponseFuture {
state: ResponseState::Rx(rx),
state: ResponseState::Rx { rx },
}
}

pub(crate) fn failed(err: crate::BoxError) -> Self {
ResponseFuture {
state: ResponseState::Failed(Some(err)),
state: ResponseState::Failed { error: Some(err) },
}
}
}
Expand All @@ -53,15 +64,15 @@ where

loop {
match this.state.as_mut().project() {
ResponseStateProj::Failed(e) => {
return Poll::Ready(Err(e.take().expect("polled after error")));
ResponseStateProj::Failed { error } => {
return Poll::Ready(Err(error.take().expect("polled after error")));
}
ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) {
Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
Ok(Err(e)) => return Poll::Ready(Err(e.into())),
Err(_) => return Poll::Ready(Err(Closed::new().into())),
},
ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into),
ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
}
}
}
Expand Down
Loading