Skip to content

Commit a22ab85

Browse files
committed
Remove will_wake calls
We used `will_wake` in correctness checks to make sure we don't hang the future/stream forever if a wrong waker is used, but the contract for this function is best-effort and it started generating more and more false positives. The recent one, found by @aleiserson, is inside rust-lang (rust-lang/rust#119863), so it is very hard to get around. We don't have a good replacement for this check unless we implement our own waker, but it is probably too much for now
1 parent 20c89ed commit a22ab85

File tree

3 files changed

+24
-33
lines changed

3 files changed

+24
-33
lines changed

ipa-core/src/helpers/buffers/ordering_sender.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ impl WaitingShard {
182182
match self.wakers[j].i.cmp(&i) {
183183
Ordering::Greater => (),
184184
Ordering::Equal => {
185-
assert!(item.w.will_wake(&self.wakers[j].w));
186185
self.wakers[j] = item;
187186
return Ok(());
188187
}

ipa-core/src/helpers/buffers/unordered_receiver.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ where
5959
if recv.is_next(this.i) {
6060
recv.poll_next(cx)
6161
} else {
62-
recv.add_waker(this.i, cx.waker().clone());
62+
recv.add_waker(this.i, cx.waker());
6363
Poll::Pending
6464
}
6565
}
@@ -190,28 +190,25 @@ where
190190
///
191191
/// [`recv`]: UnorderedReceiver::recv
192192
/// [`poll`]: Future::poll
193-
fn add_waker(&mut self, i: usize, waker: Waker) {
193+
fn add_waker(&mut self, i: usize, waker: &Waker) {
194194
assert!(
195195
i > self.next,
196196
"Awaiting a read (record = {i}) that has already been fulfilled. Read cursor is currently at {}", self.next
197197
);
198198
// We don't save a waker at `self.next`, so `>` and not `>=`.
199199
if i > self.next + self.wakers.len() {
200200
#[cfg(feature = "stall-detection")]
201-
let overflow = (waker, i);
201+
let overflow = (waker.clone(), i);
202202
#[cfg(not(feature = "stall-detection"))]
203-
let overflow = waker;
203+
let overflow = waker.clone();
204204
self.overflow_wakers.push(overflow);
205205
} else {
206206
let index = i % self.wakers.len();
207-
if let Some(old) = self.wakers[index].as_ref() {
208-
// We are OK with having multiple polls of the same `Receiver`
209-
// (or two `Receiver`s for the same item being polled).
210-
// However, as we are only tracking one waker, they both need
211-
// to be woken when we invoke the waker we get.
212-
assert!(waker.will_wake(old));
207+
if let Some(old) = self.wakers[index].as_mut() {
208+
old.clone_from(waker);
209+
} else {
210+
self.wakers[index] = Some(waker.clone());
213211
}
214-
self.wakers[index] = Some(waker);
215212
}
216213
}
217214

ipa-core/src/helpers/transport/stream/collection.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,29 +81,24 @@ impl<I: TransportIdentity, S: Stream> StreamCollection<I, S> {
8181
let mut streams = self.inner.lock().unwrap();
8282

8383
match streams.entry(key.clone()) {
84-
Entry::Occupied(mut entry) => {
85-
match entry.get_mut() {
86-
StreamState::Waiting(old_waker) => {
87-
let will_wake = old_waker.will_wake(waker);
88-
drop(streams); // avoid mutex poisoning
89-
assert!(will_wake);
90-
None
91-
}
92-
rs @ StreamState::Ready(_) => {
93-
let StreamState::Ready(stream) =
94-
std::mem::replace(rs, StreamState::Completed)
95-
else {
96-
unreachable!();
97-
};
84+
Entry::Occupied(mut entry) => match entry.get_mut() {
85+
StreamState::Waiting(old_waker) => {
86+
old_waker.clone_from(waker);
87+
None
88+
}
89+
rs @ StreamState::Ready(_) => {
90+
let StreamState::Ready(stream) = std::mem::replace(rs, StreamState::Completed)
91+
else {
92+
unreachable!();
93+
};
9894

99-
Some(stream)
100-
}
101-
StreamState::Completed => {
102-
drop(streams);
103-
panic!("{key:?} stream has been consumed already")
104-
}
95+
Some(stream)
10596
}
106-
}
97+
StreamState::Completed => {
98+
drop(streams);
99+
panic!("{key:?} stream has been consumed already")
100+
}
101+
},
107102
Entry::Vacant(entry) => {
108103
entry.insert(StreamState::Waiting(waker.clone()));
109104
None

0 commit comments

Comments
 (0)