Skip to content

Conversation

@Matthias247
Copy link
Contributor

As a first step towards structured concurrency, this change adds a
CancellationToken for graceful cancellation of tasks.

The task can be awaited by an arbitrary amount of tasks due to the usage
of an intrusive list.

The token can be cloned. In addition to this child tokens can be derived.
When the parent token gets cancelled, all child tokens will also get
cancelled.

This is the follow up from #2153 with reduced scope.

@Matthias247
Copy link
Contributor Author

TODOs before this can be merged:

  • Loom tests are crashing. Not sure what I did wrong
  • Should CancellationToken be part of tokio::sync or any other module? It would need to be available within the same tokio feature config as scope.
  • Should all APIs of CancellationToken should be public? The main questions are around cancel() and child_token() - which for the scope() use are only used by the scope itself. Application code would not need to use them. The benefit of not making them public is that we might have a chance to embed the CancellationToken actually inside the task allocation, but that mostly works well if there is a 1:1 relation between tasks and tokens. Still not sure however if that would work out. And making the whole thing publically available is also nice.
  • Should there by a separate type for cancelling the token and waiting for it? .NET has CancellationTokenSource and CancellationToken
  • I actually wanted CancellationToken::current() using task-local storage. However I couldn't use the macro from within tokio yet, due to a rustc error message I don't understand.

[dev-dependencies]
tokio-test = { version = "0.2.0" }
futures = { version = "0.3.0", features = ["async-await"] }
futures-test = "0.3.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pull what you use in here into tokio-test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only imports count_waker, since I didn't want to write it myself and it adds some value in the tests. Could be integrated into tokio too.

@LucioFranco
Copy link
Member

Should CancellationToken be part of tokio::sync or any other module? It would need to be available within the same tokio feature config as scope.

I would say for now scope is fine since the main user of this would be via the scope module.

Should all APIs of CancellationToken should be public? The main questions are around cancel() and child_token() - which for the scope() use are only used by the scope itself. Application code would not need to use them. The benefit of not making them public is that we might have a chance to embed the CancellationToken actually inside the task allocation, but that mostly works well if there is a 1:1 relation between tasks and tokens. Still not sure however if that would work out. And making the whole thing publically available is also nice.

I would think that for now keeping them private is fine, if someone really wants the public api's I assume they can use futures-intrusive instead? So lets keep public api small, I think that works best.

hawkw
hawkw previously requested changes Feb 21, 2020
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments on the proposed LinkedList implementation. I haven't reviewed the actual implementation of CancellationToken yet, but I thought it was worth leaving these comments now.

In particular, I had some thoughts about how this implementation compares to the linked list Carl proposed in PR #2210. I think there's a lot of work we could do to enforce more of the list's safety invariants at the API level. Since Tokio is a large project, with a lot of contributors, I think this is important — I'm sure you can remember all of those invariants when working with the linked list implementation, but we should try ensure every contributor can make changes that interact with this code safely.

/// get removed from the list before it gets moved or dropped.
/// In addition to this `node` may not be added to another other list before
/// it is removed from the current one.
pub unsafe fn add_front(&mut self, node: &mut ListNode<T>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intrusive list @carllerche proposed in #2210 requires nodes to be pinned in order to be inserted into the list:

pub(crate) unsafe fn push_front(&mut self, entry: Pin<&mut Entry<T>>) {

I have a strong preference for this API level, as it helps us enforce one of the two safety invariants documented in the above comment (that the inserted node's address will remain stable and will not drop while it's in the list).

Comment on lines +141 to +157
pub fn peek_first(&self) -> Option<&mut ListNode<T>> {
// Safety: When the node was inserted it was promised that it is alive
// until it gets removed from the list.
// The returned node has a pointer which constrains it to the lifetime
// of the list. This is ok, since the Node is supposed to outlive
// its insertion in the list.
unsafe {
self.head
.map(|mut node| &mut *(node.as_mut() as *mut ListNode<T>))
}
}

/// Returns the last node in the linked list without removing it from the list
/// The function is only safe as long as valid pointers are stored inside
/// the linked list.
/// The returned pointer is only guaranteed to be valid as long as the list
/// is not mutated
pub fn peek_last(&self) -> Option<&mut ListNode<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these methods should probably return a pinned node, if possible. This would prevent callers from using mem::replace or mem::swap to break the safety invariant that the nodes addresses remain stable in safe code.

If these were pinned, then callers could only move them using Pin::get_unchecked_mut, which is unsafe.

/// The returned pointer is only guaranteed to be valid as long as the list
/// is not mutated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason these cannot mutably borrow the list to ensure this invariant isn't violated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also can't directly manipulate the list anyway, because that requires a mutable reference, but the peek borrows the list and prevents getting one.

Maybe the comment is misphrased. However what people can do is insert the pointer into another list, which would blow things up. But that is at least an unsafe operation, in the same fashion as remove.

}

/// Adds a node at the front of the linked list.
/// Safety: This function is only safe as long as `node` is guaranteed to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly, this is nitpicky, but elsewhere in tokio we tend to mark safety invariants in RustDoc with a heading, like:

/// 
/// # Safety
///
/// This function is only safe as long as ...

@hawkw
Copy link
Member

hawkw commented Feb 21, 2020

It looks like there are basically two causes of CI failures on this branch: cargo check failing due to unused pub declarations in the linked_list module (we have deny(unreachable_pub) turned on), and the loom test failing due to a bus error.

Fixing the cargo check failure should be pretty easy, just change the pubs to pub(crate). I'll take a look at the loom test when I get a chance, I've gotten pretty good at debugging loom tests...

@Matthias247
Copy link
Contributor Author

Matthias247 commented Feb 22, 2020

It looks like there are basically two causes of CI failures on this branch: cargo check failing due to unused pub declarations in the linked_list module

I think they are not on the list but on the CancellationToken. I haven't fully understood yet why it happens. I guess it has something to do with feature flags, but I thought it's included in every build so far.

Tokio feature flags are too complicated :-)

I think the Loom failures might be real races. Nice to see it finds those, even though the error messages are non-existant.

@Matthias247
Copy link
Contributor Author

Matthias247 commented Feb 23, 2020

I don't get why the loom test is failing. Even if I simplify things to

#[test]
fn drop_in_order() {
    loom::model(|| {
        let token = CancellationToken::new();
        let token1 = token.clone();
        let token2 = token.clone();
        let child_token = token.child_token();
        let child_token2 = token.child_token();
    });
}

it crashes. But there is zero concurrency in this test.

@Matthias247
Copy link
Contributor Author

Reducing the test even further to this yields a stacktrace:

#[test]
fn drop_in_order() {
    loom::model(|| {
        let token = CancellationToken::new();
        // let token1 = token.clone();
        // let token2 = token.clone();
        let _child_token = token.child_token();
        // let child_token2 = token.child_token();
    });
}
---- sync::tests::loom_cancellation_token::drop_in_order stdout ----
thread 'sync::tests::loom_cancellation_token::drop_in_order' panicked at 'index out of bounds: the len is 4 but the index is 139995801197056', /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/libcore/slice/mod.rs:2806:10
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
   1: backtrace::backtrace::trace_unsynchronized
             at /cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:84
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:61
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1025
   5: std::io::Write::write_fmt
             at /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/libstd/io/mod.rs:1426
   6: std::io::impls::<impl std::io::Write for alloc::boxed::Box<W>>::write_fmt
             at src/libstd/io/impls.rs:156
   7: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:65
   8: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:50
   9: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:193
  10: std::panicking::default_hook
             at src/libstd/panicking.rs:207
  11: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:471
  12: rust_begin_unwind
             at src/libstd/panicking.rs:375
  13: core::panicking::panic_fmt
             at src/libcore/panicking.rs:84
  14: core::panicking::panic_bounds_check
             at src/libcore/panicking.rs:62
  15: <usize as core::slice::SliceIndex<[T]>>::index
             at /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/libcore/slice/mod.rs:2806
  16: core::slice::<impl core::ops::index::Index<I> for [T]>::index
             at /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/libcore/slice/mod.rs:2657
  17: <alloc::vec::Vec<T> as core::ops::index::Index<I>>::index
             at /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/liballoc/vec.rs:1871
  18: loom::rt::object::Store::last_dependent_access
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/object.rs:169
  19: loom::rt::execution::Execution::schedule
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/execution.rs:124        20: loom::rt::branch::{{closure}}
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/mod.rs:73
  21: loom::rt::scheduler::Scheduler::with_execution::{{closure}}
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/scheduler.rs:48         22: scoped_tls::ScopedKey<T>::with
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/scoped-tls-0.1.2/src/lib.rs:189
  23: loom::rt::scheduler::Scheduler::with_execution
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/scheduler.rs:48         24: loom::rt::execution
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/mod.rs:138
  25: loom::rt::branch
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/mod.rs:71
  26: loom::rt::object::Object::branch
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/object.rs:225
  27: loom::rt::atomic::Atomic::rmw
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/rt/atomic.rs:96
  28: loom::sync::atomic::atomic::Atomic<T>::try_rmw
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/sync/atomic/atomic.rs:68   29: loom::sync::atomic::atomic::Atomic<T>::compare_exchange
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/sync/atomic/atomic.rs:114  30: loom::sync::atomic::int::AtomicUsize::compare_exchange
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/sync/atomic/int.rs:65      31: tokio::sync::cancellation_token::CancellationTokenState::atomic_update_state
             at tokio/src/sync/cancellation_token.rs:167
  32: tokio::sync::cancellation_token::CancellationTokenState::remove_parent_ref
             at tokio/src/sync/cancellation_token.rs:211
  33: tokio::sync::cancellation_token::CancellationTokenState::unregister_from_parent
             at tokio/src/sync/cancellation_token.rs:277
  34: <tokio::sync::cancellation_token::CancellationToken as core::ops::drop::Drop>::drop
             at tokio/src/sync/cancellation_token.rs:596
  35: core::ptr::real_drop_in_place
             at /rustc/5e1a799842ba6ed4a57e91f7ab9435947482f7d8/src/libcore/ptr/mod.rs:182
  36: tokio::sync::tests::loom_cancellation_token::drop_in_order::{{closure}}
             at tokio/src/sync/tests/loom_cancellation_token.rs:88
  37: loom::model::Builder::check::{{closure}}
             at /home/matthias/.cargo/registry/src/github.1485827954.workers.dev-1ecc6299db9ec823/loom-0.2.14/src/model.rs:170
  38: core::ops::function::FnOnce::call_once{{vtable.shim}}

which means it defintiely doesn't like dropping the parent reference. If I keep that one there it doesn't crash - at the cost of a leak.

@jonhoo jonhoo added A-tokio Area: The main tokio crate C-proposal Category: a proposal and request for comments S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. labels Apr 17, 2020
tokio/Cargo.toml Outdated
"rt-core",
"rt-util",
"rt-threaded",
"scope",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we probably should avoid an additional feature flag. Instead, lets bundle this up with rt-util.

We will want to initially merge this behind an "unstable" cfg flag.

Copy link
Member

@hawkw hawkw Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rt-util or sync? it's in the sync module...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a synchronization primitive, so sync would generally be a good choice. I mainly put it into scope so everything could be behind the same feature flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to ask again what the idea is:

  • Put it into module sync which correlates to feature sync
  • Put it into module scope and reuse feature sync
  • Put it into module sync and use a new unstable feature

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want to:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really care where it goes at this point. We can bikeshed it later. The key for me is:

  • no new feature flags
  • All work here is behind a --cfg flag.

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the implementation seems pretty solid to me, although I'm not totally sure about the necessity of custom reference counting. Most of my other comments were either low priority style nits, or related to things we should do to get this ready to merge (using the existing linked list impl, making sure CancellationTokens can be constructed publicly, etc).

Personally, the name CancellationToken also isn't my favourite — "token" kind of implies some kind of numeric or cryptographic identifier to me, and I think we might be able to come up with a more concise name that's equally descriptive. But, I'd be okay with this merging under the current name, so I don't really care about starting a bikeshed over it.

Comment on lines 175 to 176
Ordering::SeqCst,
Ordering::SeqCst,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised that this needs sequentially consistent ordering for both success and failure; it seems like it should probably be able to be AcqRel on success and Acquire on failure.

The performance impact of SeqCst may not be a huge deal, so we can always look into changing this later; I'm asking more because I'm curious about whether there's a reason we need the global ordering here that I'm overlooking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the atomics here are a "I don't know what is the right thing, so I pick the safe default". If someone knows about a more efficient and safe version, we can change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And my guess was also that the orderings you describe are the better ones

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SeqCst is fine at this point.

#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use rand::thread_rng_n;

pub(crate) mod intrusive_double_linked_list;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this branch needs to be rebased onto master & updated to use the existing doubly-linked list implementation? Or are there missing features in the current implementation that this branch needs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it probably would need a rebase to do this. I'm not sure whether something is missing. I might actually have some preferrence for cleaning the 2 lists up in a follow-up CR, since I wouldn't like to list stuff again with everything else. Fixed enough bugs in the current CR regarding lists and locking :-)

impl CancellationToken {
/// Creates a new CancellationToken in the non-cancelled state.
#[allow(dead_code)]
pub(crate) fn new() -> CancellationToken {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that this is pub(crate) because this code began its life as an internal implementation detail of scope? Before merging we should make sure that it's possible for users to construct a cancellation token. You may have already noticed this, but I thought I'd mention it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question here was whether this will be mainly used as an implementation detail for scope, or being publically available. I think it's also a super useful general primitive, but made it pub(crate) since the primary goal was scope. I'm good with both.

Comment on lines 811 to 821
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// was cancelled
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFuture<'a> {
/// The CancellationToken that is associated with this WaitForCancellationFuture
cancellation_token: Option<&'a CancellationToken>,
/// Node for waiting at the cancellation_token
wait_node: ListNode<WaitQueueEntry>,
/// Whether this future was registered at the token yet as a waiter
is_registered: bool,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to exposing the future in the public API, but to start out, we may want to just make this private and change the function that returning it to an async fn that awaits this future? That decreases the size of the public API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we could still select! on it that should be fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when trying to move things to pub async fn cancelled(&self) I found out that calling this would return a Future which is not Send, whereas the current WaitForCancellation Future is. Afaik for some reason in scope this was important, but I don't remember anymore.

Another option is obviously to return impl Future<Output=()> + Send, but I'm not sure whether this is preferrable. We can also try starting with async fn and adding the concrete type back if Send is indeed required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...I think having the actual future type is fine? Elsewhere in Tokio, we've been trying to provide async fn-based APIs where possible, but i think being Send is important, due to spawning. I'd get @carllerche's opinion?

@hawkw
Copy link
Member

hawkw commented Apr 29, 2020

I think the FreeBSD CI failures are related to an issue that's now fixed on master, so I think this will need a rebase before CI can run properly.

@carllerche
Copy link
Member

I would like to get this merged to master sooner than later.

For me, the only thing that needs to happen before it gets merged is to hide this behind --cfg tokio_unstable. We can then iterate on the work in smaller PRs and experiment w/ the API w/ examples.

@carllerche
Copy link
Member

I also want to make it clear that we should not focus on performance at this point. We should be focusing on API exploration. I would be fine w/ using a big fat mutex around everything.

IMO, we don't even need to worry about using the linked list intrusive stuff either at this point... but

Matthias247 and others added 10 commits April 29, 2020 20:29
As a first step towards structured concurrency, this change adds a
CancellationToken for graceful cancellation of tasks.

The task can be awaited by an arbitrary amount of tasks due to the usage
of an intrusive list.

The token can be cloned. In addition to this child tokens can be derived.
When the parent token gets cancelled, all child tokens will also get
cancelled.
Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once CI passes. We can iterate on master.

@carllerche carllerche dismissed hawkw’s stale review May 2, 2020 21:18

will be iterated on in future prs.

@carllerche carllerche merged commit 187af2e into tokio-rs:master May 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate C-proposal Category: a proposal and request for comments S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants