Skip to content

Commit 4873176

Browse files
committed
Backport to 0.1: Shared must relinquish control to the executor if repolled
This backports #2136 to Futures 0.1. There isn't much to add on top of what's mentioned in #2136: the same bug exists in Futures 0.1, and it'll fail in the same way when polled in recent versions of Tokio (#2418). Backporting to 0.1 allows codebases that still have some Futures 0.1 code around to still use newer versions of Tokio.
1 parent 56f8eb9 commit 4873176

File tree

2 files changed

+63
-47
lines changed

2 files changed

+63
-47
lines changed

src/future/shared.rs

Lines changed: 33 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,8 @@ struct Notifier {
5959

6060
const IDLE: usize = 0;
6161
const POLLING: usize = 1;
62-
const REPOLL: usize = 2;
63-
const COMPLETE: usize = 3;
64-
const POISONED: usize = 4;
62+
const COMPLETE: usize = 2;
63+
const POISONED: usize = 3;
6564

6665
pub fn new<F: Future>(future: F) -> Shared<F> {
6766
Shared {
@@ -133,7 +132,7 @@ impl<F> Future for Shared<F>
133132
IDLE => {
134133
// Lock acquired, fall through
135134
}
136-
POLLING | REPOLL => {
135+
POLLING => {
137136
// Another task is currently polling, at this point we just want
138137
// to ensure that our task handle is currently registered
139138

@@ -146,56 +145,45 @@ impl<F> Future for Shared<F>
146145
_ => unreachable!(),
147146
}
148147

149-
loop {
150-
struct Reset<'a>(&'a AtomicUsize);
148+
struct Reset<'a>(&'a AtomicUsize);
151149

152-
impl<'a> Drop for Reset<'a> {
153-
fn drop(&mut self) {
154-
use std::thread;
150+
impl<'a> Drop for Reset<'a> {
151+
fn drop(&mut self) {
152+
use std::thread;
155153

156-
if thread::panicking() {
157-
self.0.store(POISONED, SeqCst);
158-
}
154+
if thread::panicking() {
155+
self.0.store(POISONED, SeqCst);
159156
}
160157
}
158+
}
161159

162-
let _reset = Reset(&self.inner.notifier.state);
163-
164-
// Poll the future
165-
let res = unsafe {
166-
(*self.inner.future.get()).as_mut().unwrap()
167-
.poll_future_notify(&self.inner.notifier, 0)
168-
};
169-
match res {
170-
Ok(Async::NotReady) => {
171-
// Not ready, try to release the handle
172-
match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
173-
POLLING => {
174-
// Success
175-
return Ok(Async::NotReady);
176-
}
177-
REPOLL => {
178-
// Gotta poll again!
179-
let prev = self.inner.notifier.state.swap(POLLING, SeqCst);
180-
assert_eq!(prev, REPOLL);
181-
}
182-
_ => unreachable!(),
160+
let _reset = Reset(&self.inner.notifier.state);
161+
162+
// Poll the future
163+
let res = unsafe {
164+
(*self.inner.future.get()).as_mut().unwrap()
165+
.poll_future_notify(&self.inner.notifier, 0)
166+
};
167+
match res {
168+
Ok(Async::NotReady) => {
169+
// Not ready, try to release the handle
170+
match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
171+
POLLING => {
172+
// Success
173+
return Ok(Async::NotReady);
183174
}
184-
175+
_ => unreachable!(),
185176
}
186-
Ok(Async::Ready(i)) => {
187-
unsafe {
188-
(*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) }));
189-
}
190177

191-
break;
178+
}
179+
Ok(Async::Ready(i)) => {
180+
unsafe {
181+
(*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) }));
192182
}
193-
Err(e) => {
194-
unsafe {
195-
(*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) }));
196-
}
197-
198-
break;
183+
}
184+
Err(e) => {
185+
unsafe {
186+
(*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) }));
199187
}
200188
}
201189
}
@@ -225,8 +213,6 @@ impl<F> Drop for Shared<F> where F: Future {
225213

226214
impl Notify for Notifier {
227215
fn notify(&self, _id: usize) {
228-
self.state.compare_and_swap(POLLING, REPOLL, SeqCst);
229-
230216
let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new());
231217

232218
for (_, waiter) in waiters {

tests/shared.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,33 @@ fn recursive_poll_with_unpark() {
202202
drop(tx0);
203203
core.run(f3).unwrap();
204204
}
205+
206+
#[test]
207+
fn shared_future_that_wakes_itself_until_pending_is_returned() {
208+
use futures::Async;
209+
use std::cell::Cell;
210+
211+
let core = ::support::local_executor::Core::new();
212+
213+
let proceed = Cell::new(false);
214+
let fut = futures::future::poll_fn(|| {
215+
Ok::<_, ()>(if proceed.get() {
216+
Async::Ready(())
217+
} else {
218+
futures::task::current().notify();
219+
Async::NotReady
220+
})
221+
})
222+
.shared()
223+
.map(|_| ())
224+
.map_err(|_| ());
225+
226+
// The join future can only complete if the second future gets a chance to run after the first
227+
// has returned pending
228+
let second = futures::future::lazy(|| {
229+
proceed.set(true);
230+
Ok::<_, ()>(())
231+
});
232+
233+
core.run(fut.join(second)).unwrap();
234+
}

0 commit comments

Comments
 (0)