|
2 | 2 |
|
3 | 3 | use crate::cell::UnsafeCell;
|
4 | 4 | use crate::future::{poll_fn, Future};
|
| 5 | +use crate::mem; |
5 | 6 | use crate::pin::Pin;
|
6 |
| -use crate::task::Poll; |
| 7 | +use crate::task::{Context, Poll}; |
7 | 8 |
|
8 | 9 | /// Polls multiple futures simultaneously, returning a tuple
|
9 | 10 | /// of all results once complete.
|
@@ -70,64 +71,77 @@ pub macro join {
|
70 | 71 | $( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
|
71 | 72 | },
|
72 | 73 | @rest: ()
|
73 |
| - ) => {{ |
| 74 | + ) => { |
74 | 75 | async move {
|
75 |
| - // The futures and whether they have completed |
76 |
| - let mut state = ( $( UnsafeCell::new(($fut, false)), )* ); |
77 |
| - |
78 |
| - // Make sure the futures don't panic |
79 |
| - // if polled after completion, and |
80 |
| - // store their output separately |
81 |
| - let mut futures = ($( |
82 |
| - ({ |
83 |
| - let ( $($pos,)* state, .. ) = &state; |
84 |
| - |
85 |
| - poll_fn(move |cx| { |
86 |
| - // SAFETY: each future borrows a distinct element |
87 |
| - // of the tuple |
88 |
| - let (fut, done) = unsafe { &mut *state.get() }; |
89 |
| - |
90 |
| - if *done { |
91 |
| - return Poll::Ready(None) |
92 |
| - } |
93 |
| - |
94 |
| - // SAFETY: The futures are never moved |
95 |
| - match unsafe { Pin::new_unchecked(fut).poll(cx) } { |
96 |
| - Poll::Ready(val) => { |
97 |
| - *done = true; |
98 |
| - Poll::Ready(Some(val)) |
99 |
| - } |
100 |
| - Poll::Pending => Poll::Pending |
101 |
| - } |
102 |
| - }) |
103 |
| - }, None), |
104 |
| - )*); |
| 76 | + let mut futures = ( $( MaybeDone::Future($fut), )* ); |
105 | 77 |
|
106 | 78 | poll_fn(move |cx| {
|
107 | 79 | let mut done = true;
|
108 | 80 |
|
109 | 81 | $(
|
110 |
| - let ( $($pos,)* (fut, out), .. ) = &mut futures; |
| 82 | + let ( $($pos,)* fut, .. ) = &mut futures; |
111 | 83 |
|
112 | 84 | // SAFETY: The futures are never moved
|
113 |
| - match unsafe { Pin::new_unchecked(fut).poll(cx) } { |
114 |
| - Poll::Ready(Some(val)) => *out = Some(val), |
115 |
| - // the future was already done |
116 |
| - Poll::Ready(None) => {}, |
117 |
| - Poll::Pending => done = false, |
118 |
| - } |
| 85 | + done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() }; |
119 | 86 | )*
|
120 | 87 |
|
121 | 88 | if done {
|
122 | 89 | // Extract all the outputs
|
123 | 90 | Poll::Ready(($({
|
124 |
| - let ( $($pos,)* (_, val), .. ) = &mut futures; |
125 |
| - val.unwrap() |
| 91 | + let ( $($pos,)* fut, .. ) = &mut futures; |
| 92 | + |
| 93 | + fut.take_output().unwrap() |
126 | 94 | }),*))
|
127 | 95 | } else {
|
128 | 96 | Poll::Pending
|
129 | 97 | }
|
130 | 98 | }).await
|
131 | 99 | }
|
132 |
| - }} |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +/// Future used by `join!` that stores it's output to |
| 104 | +/// be later taken and doesn't panic when polled after ready. |
| 105 | +/// |
| 106 | +/// This type is public in a private module for use by the macro. |
| 107 | +#[allow(missing_debug_implementations)] |
| 108 | +#[unstable(feature = "future_join", issue = "91642")] |
| 109 | +pub enum MaybeDone<F: Future> { |
| 110 | + Future(F), |
| 111 | + Done(F::Output), |
| 112 | + Took, |
| 113 | +} |
| 114 | + |
| 115 | +#[unstable(feature = "future_join", issue = "91642")] |
| 116 | +impl<F: Future> MaybeDone<F> { |
| 117 | + pub fn take_output(&mut self) -> Option<F::Output> { |
| 118 | + match &*self { |
| 119 | + MaybeDone::Done(_) => match mem::replace(self, Self::Took) { |
| 120 | + MaybeDone::Done(val) => Some(val), |
| 121 | + _ => unreachable!(), |
| 122 | + }, |
| 123 | + _ => None, |
| 124 | + } |
| 125 | + } |
| 126 | +} |
| 127 | + |
| 128 | +#[unstable(feature = "future_join", issue = "91642")] |
| 129 | +impl<F: Future> Future for MaybeDone<F> { |
| 130 | + type Output = (); |
| 131 | + |
| 132 | + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 133 | + // SAFETY: pinning in structural for `f` |
| 134 | + unsafe { |
| 135 | + match self.as_mut().get_unchecked_mut() { |
| 136 | + MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) { |
| 137 | + Poll::Ready(val) => self.set(Self::Done(val)), |
| 138 | + Poll::Pending => return Poll::Pending, |
| 139 | + }, |
| 140 | + MaybeDone::Done(_) => {} |
| 141 | + MaybeDone::Took => unreachable!(), |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + Poll::Ready(()) |
| 146 | + } |
133 | 147 | }
|
0 commit comments