Skip to content

Commit 606206e

Browse files
committed
Merge branch 'tokio-1.6.x' into merge-1.6.x
2 parents 2c24a02 + dfe4013 commit 606206e

File tree

5 files changed

+190
-32
lines changed

5 files changed

+190
-32
lines changed

tokio/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
# 1.6.2 (June 14, 2021)
2+
3+
### Fixes
4+
5+
- test: sub-ms `time:advance` regression introduced in 1.6 ([#3852])
6+
7+
[#3852]: https://github.com/tokio-rs/tokio/pull/3852
8+
19
# 1.6.1 (May 28, 2021)
210

311
This release reverts [#3518] because it doesn't work on some kernels due to

tokio/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ name = "tokio"
77
# - README.md
88
# - Update CHANGELOG.md.
99
# - Create "v1.0.x" git tag.
10-
version = "1.6.1"
10+
version = "1.6.2"
1111
edition = "2018"
1212
authors = ["Tokio Contributors <[email protected]>"]
1313
license = "MIT"
1414
readme = "README.md"
15-
documentation = "https://docs.rs/tokio/1.6.0/tokio/"
15+
documentation = "https://docs.rs/tokio/1.6.2/tokio/"
1616
repository = "https://github.com/tokio-rs/tokio"
1717
homepage = "https://tokio.rs"
1818
description = """

tokio/src/time/clock.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! configurable.
88
99
cfg_not_test_util! {
10-
use crate::time::{Duration, Instant};
10+
use crate::time::{Instant};
1111

1212
#[derive(Debug, Clone)]
1313
pub(crate) struct Clock {}
@@ -24,14 +24,6 @@ cfg_not_test_util! {
2424
pub(crate) fn now(&self) -> Instant {
2525
now()
2626
}
27-
28-
pub(crate) fn is_paused(&self) -> bool {
29-
false
30-
}
31-
32-
pub(crate) fn advance(&self, _dur: Duration) {
33-
unreachable!();
34-
}
3527
}
3628
}
3729

@@ -121,10 +113,9 @@ cfg_test_util! {
121113
/// runtime.
122114
pub async fn advance(duration: Duration) {
123115
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
124-
let until = clock.now() + duration;
125116
clock.advance(duration);
126117

127-
crate::time::sleep_until(until).await;
118+
crate::task::yield_now().await;
128119
}
129120

130121
/// Return the current instant, factoring in frozen time.

tokio/src/time/driver/mod.rs

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> {
9191

9292
/// Parker to delegate to
9393
park: P,
94+
95+
// When `true`, a call to `park_timeout` should immediately return and time
96+
// should not advance. One reason for this to be `true` is if the task
97+
// passed to `Runtime::block_on` called `task::yield_now()`.
98+
//
99+
// While it may look racy, it only has any effect when the clock is paused
100+
// and pausing the clock is restricted to a single-threaded runtime.
101+
#[cfg(feature = "test-util")]
102+
did_wake: Arc<AtomicBool>,
94103
}
95104

96105
/// A structure which handles conversion from Instants to u64 timestamps.
@@ -178,6 +187,8 @@ where
178187
time_source,
179188
handle: Handle::new(Arc::new(inner)),
180189
park,
190+
#[cfg(feature = "test-util")]
191+
did_wake: Arc::new(AtomicBool::new(false)),
181192
}
182193
}
183194

@@ -192,8 +203,6 @@ where
192203
}
193204

194205
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
195-
let clock = &self.time_source.clock;
196-
197206
let mut lock = self.handle.get().state.lock();
198207

199208
assert!(!self.handle.is_shutdown());
@@ -217,26 +226,14 @@ where
217226
duration = std::cmp::min(limit, duration);
218227
}
219228

220-
if clock.is_paused() {
221-
self.park.park_timeout(Duration::from_secs(0))?;
222-
223-
// Simulate advancing time
224-
clock.advance(duration);
225-
} else {
226-
self.park.park_timeout(duration)?;
227-
}
229+
self.park_timeout(duration)?;
228230
} else {
229231
self.park.park_timeout(Duration::from_secs(0))?;
230232
}
231233
}
232234
None => {
233235
if let Some(duration) = limit {
234-
if clock.is_paused() {
235-
self.park.park_timeout(Duration::from_secs(0))?;
236-
clock.advance(duration);
237-
} else {
238-
self.park.park_timeout(duration)?;
239-
}
236+
self.park_timeout(duration)?;
240237
} else {
241238
self.park.park()?;
242239
}
@@ -248,6 +245,39 @@ where
248245

249246
Ok(())
250247
}
248+
249+
cfg_test_util! {
250+
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
251+
let clock = &self.time_source.clock;
252+
253+
if clock.is_paused() {
254+
self.park.park_timeout(Duration::from_secs(0))?;
255+
256+
// If the time driver was woken, then the park completed
257+
// before the "duration" elapsed (usually caused by a
258+
// yield in `Runtime::block_on`). In this case, we don't
259+
// advance the clock.
260+
if !self.did_wake() {
261+
// Simulate advancing time
262+
clock.advance(duration);
263+
}
264+
} else {
265+
self.park.park_timeout(duration)?;
266+
}
267+
268+
Ok(())
269+
}
270+
271+
fn did_wake(&self) -> bool {
272+
self.did_wake.swap(false, Ordering::SeqCst)
273+
}
274+
}
275+
276+
cfg_not_test_util! {
277+
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
278+
self.park.park_timeout(duration)
279+
}
280+
}
251281
}
252282

253283
impl Handle {
@@ -387,11 +417,11 @@ impl<P> Park for Driver<P>
387417
where
388418
P: Park + 'static,
389419
{
390-
type Unpark = P::Unpark;
420+
type Unpark = TimerUnpark<P>;
391421
type Error = P::Error;
392422

393423
fn unpark(&self) -> Self::Unpark {
394-
self.park.unpark()
424+
TimerUnpark::new(self)
395425
}
396426

397427
fn park(&mut self) -> Result<(), Self::Error> {
@@ -426,6 +456,33 @@ where
426456
}
427457
}
428458

459+
pub(crate) struct TimerUnpark<P: Park + 'static> {
460+
inner: P::Unpark,
461+
462+
#[cfg(feature = "test-util")]
463+
did_wake: Arc<AtomicBool>,
464+
}
465+
466+
impl<P: Park + 'static> TimerUnpark<P> {
467+
fn new(driver: &Driver<P>) -> TimerUnpark<P> {
468+
TimerUnpark {
469+
inner: driver.park.unpark(),
470+
471+
#[cfg(feature = "test-util")]
472+
did_wake: driver.did_wake.clone(),
473+
}
474+
}
475+
}
476+
477+
impl<P: Park + 'static> Unpark for TimerUnpark<P> {
478+
fn unpark(&self) {
479+
#[cfg(feature = "test-util")]
480+
self.did_wake.store(true, Ordering::SeqCst);
481+
482+
self.inner.unpark();
483+
}
484+
}
485+
429486
// ===== impl Inner =====
430487

431488
impl Inner {

tokio/tests/time_pause.rs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use rand::SeedableRng;
55
use rand::{rngs::StdRng, Rng};
66
use tokio::time::{self, Duration, Instant, Sleep};
7-
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task};
7+
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task};
88

99
use std::{
1010
future::Future,
@@ -215,6 +215,108 @@ async fn interval() {
215215
assert_pending!(poll_next(&mut i));
216216
}
217217

218+
#[tokio::test(start_paused = true)]
219+
async fn test_time_advance_sub_ms() {
220+
let now = Instant::now();
221+
222+
let dur = Duration::from_micros(51_592);
223+
time::advance(dur).await;
224+
225+
assert_eq!(now.elapsed(), dur);
226+
227+
let now = Instant::now();
228+
let dur = Duration::from_micros(1);
229+
time::advance(dur).await;
230+
231+
assert_eq!(now.elapsed(), dur);
232+
}
233+
234+
#[tokio::test(start_paused = true)]
235+
async fn test_time_advance_3ms_and_change() {
236+
let now = Instant::now();
237+
238+
let dur = Duration::from_micros(3_141_592);
239+
time::advance(dur).await;
240+
241+
assert_eq!(now.elapsed(), dur);
242+
243+
let now = Instant::now();
244+
let dur = Duration::from_micros(3_123_456);
245+
time::advance(dur).await;
246+
247+
assert_eq!(now.elapsed(), dur);
248+
}
249+
250+
#[tokio::test(start_paused = true)]
251+
async fn regression_3710_with_submillis_advance() {
252+
let start = Instant::now();
253+
254+
time::advance(Duration::from_millis(1)).await;
255+
256+
let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60)));
257+
258+
assert_pending!(sleep.poll());
259+
260+
let before = Instant::now();
261+
let dur = Duration::from_micros(51_592);
262+
time::advance(dur).await;
263+
assert_eq!(before.elapsed(), dur);
264+
265+
assert_pending!(sleep.poll());
266+
}
267+
268+
#[tokio::test(start_paused = true)]
269+
async fn exact_1ms_advance() {
270+
let now = Instant::now();
271+
272+
let dur = Duration::from_millis(1);
273+
time::advance(dur).await;
274+
275+
assert_eq!(now.elapsed(), dur);
276+
277+
let now = Instant::now();
278+
let dur = Duration::from_millis(1);
279+
time::advance(dur).await;
280+
281+
assert_eq!(now.elapsed(), dur);
282+
}
283+
284+
#[tokio::test(start_paused = true)]
285+
async fn advance_once_with_timer() {
286+
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
287+
assert_pending!(sleep.poll());
288+
289+
time::advance(Duration::from_micros(250)).await;
290+
assert_pending!(sleep.poll());
291+
292+
time::advance(Duration::from_micros(1500)).await;
293+
294+
assert!(sleep.is_woken());
295+
assert_ready!(sleep.poll());
296+
}
297+
298+
#[tokio::test(start_paused = true)]
299+
async fn advance_multi_with_timer() {
300+
// Round to the nearest ms
301+
// time::sleep(Duration::from_millis(1)).await;
302+
303+
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
304+
assert_pending!(sleep.poll());
305+
306+
time::advance(Duration::from_micros(250)).await;
307+
assert_pending!(sleep.poll());
308+
309+
time::advance(Duration::from_micros(250)).await;
310+
assert_pending!(sleep.poll());
311+
312+
time::advance(Duration::from_micros(250)).await;
313+
assert_pending!(sleep.poll());
314+
315+
time::advance(Duration::from_micros(250)).await;
316+
assert!(sleep.is_woken());
317+
assert_ready!(sleep.poll());
318+
}
319+
218320
fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
219321
interval.enter(|cx, mut interval| interval.poll_tick(cx))
220322
}

0 commit comments

Comments
 (0)