-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Add CancellationToken #2263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add CancellationToken #2263
Conversation
|
TODOs before this can be merged:
|
| [dev-dependencies] | ||
| tokio-test = { version = "0.2.0" } | ||
| futures = { version = "0.3.0", features = ["async-await"] } | ||
| futures-test = "0.3.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pull what you use in here into tokio-test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only imports count_waker, since I didn't want to write it myself and it adds some value in the tests. Could be integrated into tokio too.
I would say for now
I would think that for now keeping them private is fine, if someone really wants the public api's I assume they can use |
hawkw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments on the proposed LinkedList implementation. I haven't reviewed the actual implementation of CancellationToken yet, but I thought it was worth leaving these comments now.
In particular, I had some thoughts about how this implementation compares to the linked list Carl proposed in PR #2210. I think there's a lot of work we could do to enforce more of the list's safety invariants at the API level. Since Tokio is a large project, with a lot of contributors, I think this is important — I'm sure you can remember all of those invariants when working with the linked list implementation, but we should try ensure every contributor can make changes that interact with this code safely.
| /// get removed from the list before it gets moved or dropped. | ||
| /// In addition to this `node` may not be added to another other list before | ||
| /// it is removed from the current one. | ||
| pub unsafe fn add_front(&mut self, node: &mut ListNode<T>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intrusive list @carllerche proposed in #2210 requires nodes to be pinned in order to be inserted into the list:
tokio/tokio/src/util/linked_list.rs
Line 58 in 1890645
| pub(crate) unsafe fn push_front(&mut self, entry: Pin<&mut Entry<T>>) { |
I have a strong preference for this API level, as it helps us enforce one of the two safety invariants documented in the above comment (that the inserted node's address will remain stable and will not drop while it's in the list).
| pub fn peek_first(&self) -> Option<&mut ListNode<T>> { | ||
| // Safety: When the node was inserted it was promised that it is alive | ||
| // until it gets removed from the list. | ||
| // The returned node has a pointer which constrains it to the lifetime | ||
| // of the list. This is ok, since the Node is supposed to outlive | ||
| // its insertion in the list. | ||
| unsafe { | ||
| self.head | ||
| .map(|mut node| &mut *(node.as_mut() as *mut ListNode<T>)) | ||
| } | ||
| } | ||
|
|
||
| /// Returns the last node in the linked list without removing it from the list | ||
| /// The function is only safe as long as valid pointers are stored inside | ||
| /// the linked list. | ||
| /// The returned pointer is only guaranteed to be valid as long as the list | ||
| /// is not mutated | ||
| pub fn peek_last(&self) -> Option<&mut ListNode<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO these methods should probably return a pinned node, if possible. This would prevent callers from using mem::replace or mem::swap to break the safety invariant that the nodes addresses remain stable in safe code.
If these were pinned, then callers could only move them using Pin::get_unchecked_mut, which is unsafe.
| /// The returned pointer is only guaranteed to be valid as long as the list | ||
| /// is not mutated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason these cannot mutably borrow the list to ensure this invariant isn't violated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You also can't directly manipulate the list anyway, because that requires a mutable reference, but the peek borrows the list and prevents getting one.
Maybe the comment is misphrased. However what people can do is insert the pointer into another list, which would blow things up. But that is at least an unsafe operation, in the same fashion as remove.
| } | ||
|
|
||
| /// Adds a node at the front of the linked list. | ||
| /// Safety: This function is only safe as long as `node` is guaranteed to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Admittedly, this is nitpicky, but elsewhere in tokio we tend to mark safety invariants in RustDoc with a heading, like:
///
/// # Safety
///
/// This function is only safe as long as ...
|
It looks like there are basically two causes of CI failures on this branch: Fixing the |
I think they are not on the list but on the Tokio feature flags are too complicated :-) I think the Loom failures might be real races. Nice to see it finds those, even though the error messages are non-existant. |
|
I don't get why the loom test is failing. Even if I simplify things to #[test]
fn drop_in_order() {
loom::model(|| {
let token = CancellationToken::new();
let token1 = token.clone();
let token2 = token.clone();
let child_token = token.child_token();
let child_token2 = token.child_token();
});
}it crashes. But there is zero concurrency in this test. |
|
Reducing the test even further to this yields a stacktrace: #[test]
fn drop_in_order() {
loom::model(|| {
let token = CancellationToken::new();
// let token1 = token.clone();
// let token2 = token.clone();
let _child_token = token.child_token();
// let child_token2 = token.child_token();
});
}which means it defintiely doesn't like dropping the parent reference. If I keep that one there it doesn't crash - at the cost of a leak. |
tokio/Cargo.toml
Outdated
| "rt-core", | ||
| "rt-util", | ||
| "rt-threaded", | ||
| "scope", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, we probably should avoid an additional feature flag. Instead, lets bundle this up with rt-util.
We will want to initially merge this behind an "unstable" cfg flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rt-util or sync? it's in the sync module...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s a synchronization primitive, so sync would generally be a good choice. I mainly put it into scope so everything could be behind the same feature flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to ask again what the idea is:
- Put it into module
syncwhich correlates to featuresync - Put it into module
scopeand reuse featuresync - Put it into module
syncand use a newunstablefeature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we want to:
- Put it into module
syncand require thesyncfuture - Also require
cfg(tokio_unstable)(not a feature flag, but aRUSTFLAGScfg), as in PR syscalls: #1845 Introduce unstable Syscall trait. #2306.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really care where it goes at this point. We can bikeshed it later. The key for me is:
- no new feature flags
- All work here is behind a
--cfgflag.
hawkw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the implementation seems pretty solid to me, although I'm not totally sure about the necessity of custom reference counting. Most of my other comments were either low priority style nits, or related to things we should do to get this ready to merge (using the existing linked list impl, making sure CancellationTokens can be constructed publicly, etc).
Personally, the name CancellationToken also isn't my favourite — "token" kind of implies some kind of numeric or cryptographic identifier to me, and I think we might be able to come up with a more concise name that's equally descriptive. But, I'd be okay with this merging under the current name, so I don't really care about starting a bikeshed over it.
| Ordering::SeqCst, | ||
| Ordering::SeqCst, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little surprised that this needs sequentially consistent ordering for both success and failure; it seems like it should probably be able to be AcqRel on success and Acquire on failure.
The performance impact of SeqCst may not be a huge deal, so we can always look into changing this later; I'm asking more because I'm curious about whether there's a reason we need the global ordering here that I'm overlooking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the atomics here are a "I don't know what is the right thing, so I pick the safe default". If someone knows about a more efficient and safe version, we can change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And my guess was also that the orderings you describe are the better ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SeqCst is fine at this point.
| #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] | ||
| pub use rand::thread_rng_n; | ||
|
|
||
| pub(crate) mod intrusive_double_linked_list; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this branch needs to be rebased onto master & updated to use the existing doubly-linked list implementation? Or are there missing features in the current implementation that this branch needs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it probably would need a rebase to do this. I'm not sure whether something is missing. I might actually have some preferrence for cleaning the 2 lists up in a follow-up CR, since I wouldn't like to list stuff again with everything else. Fixed enough bugs in the current CR regarding lists and locking :-)
| impl CancellationToken { | ||
| /// Creates a new CancellationToken in the non-cancelled state. | ||
| #[allow(dead_code)] | ||
| pub(crate) fn new() -> CancellationToken { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing that this is pub(crate) because this code began its life as an internal implementation detail of scope? Before merging we should make sure that it's possible for users to construct a cancellation token. You may have already noticed this, but I thought I'd mention it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question here was whether this will be mainly used as an implementation detail for scope, or being publically available. I think it's also a super useful general primitive, but made it pub(crate) since the primary goal was scope. I'm good with both.
| /// A Future that is resolved once the corresponding [`CancellationToken`] | ||
| /// was cancelled | ||
| #[must_use = "futures do nothing unless polled"] | ||
| pub struct WaitForCancellationFuture<'a> { | ||
| /// The CancellationToken that is associated with this WaitForCancellationFuture | ||
| cancellation_token: Option<&'a CancellationToken>, | ||
| /// Node for waiting at the cancellation_token | ||
| wait_node: ListNode<WaitQueueEntry>, | ||
| /// Whether this future was registered at the token yet as a waiter | ||
| is_registered: bool, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not opposed to exposing the future in the public API, but to start out, we may want to just make this private and change the function that returning it to an async fn that awaits this future? That decreases the size of the public API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we could still select! on it that should be fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when trying to move things to pub async fn cancelled(&self) I found out that calling this would return a Future which is not Send, whereas the current WaitForCancellation Future is. Afaik for some reason in scope this was important, but I don't remember anymore.
Another option is obviously to return impl Future<Output=()> + Send, but I'm not sure whether this is preferrable. We can also try starting with async fn and adding the concrete type back if Send is indeed required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...I think having the actual future type is fine? Elsewhere in Tokio, we've been trying to provide async fn-based APIs where possible, but i think being Send is important, due to spawning. I'd get @carllerche's opinion?
|
I think the FreeBSD CI failures are related to an issue that's now fixed on master, so I think this will need a rebase before CI can run properly. |
|
I would like to get this merged to master sooner than later. For me, the only thing that needs to happen before it gets merged is to hide this behind |
|
I also want to make it clear that we should not focus on performance at this point. We should be focusing on API exploration. I would be fine w/ using a big fat mutex around everything. IMO, we don't even need to worry about using the linked list intrusive stuff either at this point... but |
As a first step towards structured concurrency, this change adds a CancellationToken for graceful cancellation of tasks. The task can be awaited by an arbitrary amount of tasks due to the usage of an intrusive list. The token can be cloned. In addition to this child tokens can be derived. When the parent token gets cancelled, all child tokens will also get cancelled.
Co-Authored-By: Eliza Weisman <[email protected]>
Co-Authored-By: Eliza Weisman <[email protected]>
cb892e8 to
02b3d4d
Compare
carllerche
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once CI passes. We can iterate on master.
As a first step towards structured concurrency, this change adds a
CancellationToken for graceful cancellation of tasks.
The task can be awaited by an arbitrary amount of tasks due to the usage
of an intrusive list.
The token can be cloned. In addition to this child tokens can be derived.
When the parent token gets cancelled, all child tokens will also get
cancelled.
This is the follow up from #2153 with reduced scope.