Skip to content

Commit 2ad3474

Browse files
authored
rt: minor time driver refactors (#4989)
This patch makes some minor refactors. It renames `ClockTime` to `TimeSource` since that is how all variables refer to it. It also moves the type into a new file. Finally, it moves the `unpark` handle out of the mutex as it does not need to be there. Note, the call to `unpark` is still called while the mutex is held, so there is no functional change. Moving it out of the mutex is in preparation for moving the unpark handle completely out of the time driver.
1 parent 291fce8 commit 2ad3474

5 files changed

Lines changed: 67 additions & 62 deletions

File tree

tokio/src/runtime/time/handle.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use crate::loom::sync::Arc;
2-
use crate::runtime::time::ClockTime;
2+
use crate::runtime::time::TimeSource;
33
use std::fmt;
44

55
/// Handle to time driver instance.
66
#[derive(Clone)]
77
pub(crate) struct Handle {
8-
time_source: ClockTime,
9-
inner: Arc<super::Inner>,
8+
time_source: TimeSource,
9+
pub(super) inner: Arc<super::Inner>,
1010
}
1111

1212
impl Handle {
@@ -17,7 +17,7 @@ impl Handle {
1717
}
1818

1919
/// Returns the time source associated with this handle.
20-
pub(crate) fn time_source(&self) -> &ClockTime {
20+
pub(crate) fn time_source(&self) -> &TimeSource {
2121
&self.time_source
2222
}
2323

tokio/src/runtime/time/mod.rs

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ use entry::{EntryList, TimerHandle, TimerShared};
1313
mod handle;
1414
pub(crate) use self::handle::Handle;
1515

16+
mod source;
17+
pub(crate) use source::TimeSource;
18+
1619
mod wheel;
1720

1821
use crate::loom::sync::atomic::{AtomicBool, Ordering};
1922
use crate::loom::sync::{Arc, Mutex};
2023
use crate::park::{Park, Unpark};
2124
use crate::time::error::Error;
22-
use crate::time::{Clock, Duration, Instant};
25+
use crate::time::{Clock, Duration};
2326

24-
use std::convert::TryInto;
2527
use std::fmt;
2628
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
2729

@@ -83,7 +85,7 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
8385
#[derive(Debug)]
8486
pub(crate) struct Driver<P: Park + 'static> {
8587
/// Timing backend in use.
86-
time_source: ClockTime,
88+
time_source: TimeSource,
8789

8890
/// Shared state.
8991
handle: Handle,
@@ -101,58 +103,22 @@ pub(crate) struct Driver<P: Park + 'static> {
101103
did_wake: Arc<AtomicBool>,
102104
}
103105

104-
/// A structure which handles conversion from Instants to u64 timestamps.
105-
#[derive(Debug, Clone)]
106-
pub(crate) struct ClockTime {
107-
clock: crate::time::Clock,
108-
start_time: Instant,
109-
}
110-
111-
impl ClockTime {
112-
pub(self) fn new(clock: Clock) -> Self {
113-
Self {
114-
start_time: clock.now(),
115-
clock,
116-
}
117-
}
118-
119-
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
120-
// Round up to the end of a ms
121-
self.instant_to_tick(t + Duration::from_nanos(999_999))
122-
}
123-
124-
pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
125-
// round up
126-
let dur: Duration = t
127-
.checked_duration_since(self.start_time)
128-
.unwrap_or_else(|| Duration::from_secs(0));
129-
let ms = dur.as_millis();
130-
131-
ms.try_into().unwrap_or(u64::MAX)
132-
}
133-
134-
pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
135-
Duration::from_millis(t)
136-
}
137-
138-
pub(crate) fn now(&self) -> u64 {
139-
self.instant_to_tick(self.clock.now())
140-
}
141-
}
142-
143106
/// Timer state shared between `Driver`, `Handle`, and `Registration`.
144107
struct Inner {
145108
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
146109
pub(super) state: Mutex<InnerState>,
147110

148111
/// True if the driver is being shutdown.
149112
pub(super) is_shutdown: AtomicBool,
113+
114+
/// Unparker that can be used to wake the time driver.
115+
unpark: Box<dyn Unpark>,
150116
}
151117

152118
/// Time state shared which must be protected by a `Mutex`
153119
struct InnerState {
154120
/// Timing backend in use.
155-
time_source: ClockTime,
121+
time_source: TimeSource,
156122

157123
/// The last published timer `elapsed` value.
158124
elapsed: u64,
@@ -162,9 +128,6 @@ struct InnerState {
162128

163129
/// Timer wheel.
164130
wheel: wheel::Wheel,
165-
166-
/// Unparker that can be used to wake the time driver.
167-
unpark: Box<dyn Unpark>,
168131
}
169132

170133
// ===== impl Driver =====
@@ -178,7 +141,7 @@ where
178141
///
179142
/// Specifying the source of time is useful when testing.
180143
pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
181-
let time_source = ClockTime::new(clock);
144+
let time_source = TimeSource::new(clock);
182145

183146
let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
184147

@@ -397,7 +360,7 @@ impl Handle {
397360
.map(|next_wake| when < next_wake.get())
398361
.unwrap_or(true)
399362
{
400-
lock.unpark.unpark();
363+
self.inner.unpark.unpark();
401364
}
402365

403366
None
@@ -493,15 +456,15 @@ impl<P: Park + 'static> Unpark for TimerUnpark<P> {
493456
// ===== impl Inner =====
494457

495458
impl Inner {
496-
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
459+
pub(self) fn new(time_source: TimeSource, unpark: Box<dyn Unpark>) -> Self {
497460
Inner {
498461
state: Mutex::new(InnerState {
499462
time_source,
500463
elapsed: 0,
501464
next_wake: None,
502-
unpark,
503465
wheel: wheel::Wheel::new(),
504466
}),
467+
unpark,
505468
is_shutdown: AtomicBool::new(false),
506469
}
507470
}

tokio/src/runtime/time/source.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use crate::time::{Clock, Duration, Instant};
2+
3+
use std::convert::TryInto;
4+
5+
/// A structure which handles conversion from Instants to u64 timestamps.
6+
#[derive(Debug, Clone)]
7+
pub(crate) struct TimeSource {
8+
pub(crate) clock: Clock,
9+
start_time: Instant,
10+
}
11+
12+
impl TimeSource {
13+
pub(crate) fn new(clock: Clock) -> Self {
14+
Self {
15+
start_time: clock.now(),
16+
clock,
17+
}
18+
}
19+
20+
pub(crate) fn deadline_to_tick(&self, t: Instant) -> u64 {
21+
// Round up to the end of a ms
22+
self.instant_to_tick(t + Duration::from_nanos(999_999))
23+
}
24+
25+
pub(crate) fn instant_to_tick(&self, t: Instant) -> u64 {
26+
// round up
27+
let dur: Duration = t
28+
.checked_duration_since(self.start_time)
29+
.unwrap_or_else(|| Duration::from_secs(0));
30+
let ms = dur.as_millis();
31+
32+
ms.try_into().unwrap_or(u64::MAX)
33+
}
34+
35+
pub(crate) fn tick_to_duration(&self, t: u64) -> Duration {
36+
Duration::from_millis(t)
37+
}
38+
39+
pub(crate) fn now(&self) -> u64 {
40+
self.instant_to_tick(self.clock.now())
41+
}
42+
}

tokio/src/runtime/time/tests/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
4949
fn single_timer() {
5050
model(|| {
5151
let clock = crate::time::Clock::new(true, false);
52-
let time_source = super::ClockTime::new(clock.clone());
52+
let time_source = super::TimeSource::new(clock.clone());
5353

5454
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
5555
let handle = Handle::new(Arc::new(inner));
@@ -80,7 +80,7 @@ fn single_timer() {
8080
fn drop_timer() {
8181
model(|| {
8282
let clock = crate::time::Clock::new(true, false);
83-
let time_source = super::ClockTime::new(clock.clone());
83+
let time_source = super::TimeSource::new(clock.clone());
8484

8585
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
8686
let handle = Handle::new(Arc::new(inner));
@@ -111,7 +111,7 @@ fn drop_timer() {
111111
fn change_waker() {
112112
model(|| {
113113
let clock = crate::time::Clock::new(true, false);
114-
let time_source = super::ClockTime::new(clock.clone());
114+
let time_source = super::TimeSource::new(clock.clone());
115115

116116
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
117117
let handle = Handle::new(Arc::new(inner));
@@ -146,7 +146,7 @@ fn reset_future() {
146146
let finished_early = Arc::new(AtomicBool::new(false));
147147

148148
let clock = crate::time::Clock::new(true, false);
149-
let time_source = super::ClockTime::new(clock.clone());
149+
let time_source = super::TimeSource::new(clock.clone());
150150

151151
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
152152
let handle = Handle::new(Arc::new(inner));
@@ -204,7 +204,7 @@ fn poll_process_levels() {
204204
let clock = crate::time::Clock::new(true, false);
205205
clock.pause();
206206

207-
let time_source = super::ClockTime::new(clock.clone());
207+
let time_source = super::TimeSource::new(clock.clone());
208208

209209
let inner = super::Inner::new(time_source, MockUnpark::mock());
210210
let handle = Handle::new(Arc::new(inner));
@@ -245,7 +245,7 @@ fn poll_process_levels_targeted() {
245245
let clock = crate::time::Clock::new(true, false);
246246
clock.pause();
247247

248-
let time_source = super::ClockTime::new(clock.clone());
248+
let time_source = super::TimeSource::new(clock.clone());
249249

250250
let inner = super::Inner::new(time_source, MockUnpark::mock());
251251
let handle = Handle::new(Arc::new(inner));

tokio/src/time/sleep.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#[cfg(all(tokio_unstable, feature = "tracing"))]
2-
use crate::runtime::time::ClockTime;
2+
use crate::runtime::time::TimeSource;
33
use crate::runtime::time::{Handle, TimerEntry};
44
use crate::time::{error::Error, Duration, Instant};
55
use crate::util::trace;
@@ -239,7 +239,7 @@ cfg_trace! {
239239
struct Inner {
240240
deadline: Instant,
241241
ctx: trace::AsyncOpTracingCtx,
242-
time_source: ClockTime,
242+
time_source: TimeSource,
243243
}
244244
}
245245

0 commit comments

Comments
 (0)