Skip to content

Commit 8feebab

Browse files
authored
io: rewrite slab to support compaction (#2757)
The I/O driver uses a slab to store per-resource state. Doing this provides two benefits. First, allocating state is streamlined. Second, resources may be safely indexed using a `usize` type. The `usize` is used passed to the OS's selector when registering for receiving events. The original slab implementation used a `Vec` backed by `RwLock`. This primarily caused contention when reading state. This implementation also only **grew** the slab capacity but never shrank. In #1625, the slab was rewritten to use a lock-free strategy. The lock contention was removed but this implementation was still grow-only. This change adds the ability to release memory. Similar to the previous implementation, it structures the slab to use a vector of pages. This enables growing the slab without having to move any previous entries. It also adds the ability to release pages. This is done by introducing a lock when allocating/releasing slab entries. This does not impact benchmarks, primarily due to the existing implementation not being "done" and also having a lock around allocating and releasing. A `Slab::compact()` function is added. Pages are iterated. When a page is found with no slots in use, the page is freed. The `compact()` function is called occasionally by the I/O driver. Fixes #2505
1 parent 674985d commit 8feebab

File tree

17 files changed

+928
-1344
lines changed

17 files changed

+928
-1344
lines changed

tokio/src/io/driver/mod.rs

Lines changed: 72 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,30 @@ pub(crate) mod platform;
33
mod scheduled_io;
44
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
55

6-
use crate::loom::sync::atomic::AtomicUsize;
76
use crate::park::{Park, Unpark};
87
use crate::runtime::context;
9-
use crate::util::slab::{Address, Slab};
8+
use crate::util::bit;
9+
use crate::util::slab::{self, Slab};
1010

1111
use mio::event::Evented;
1212
use std::fmt;
1313
use std::io;
14-
use std::sync::atomic::Ordering::SeqCst;
1514
use std::sync::{Arc, Weak};
1615
use std::task::Waker;
1716
use std::time::Duration;
1817

1918
/// I/O driver, backed by Mio
2019
pub(crate) struct Driver {
20+
/// Tracks the number of times `turn` is called. It is safe for this to wrap
21+
/// as it is mostly used to determine when to call `compact()`
22+
tick: u16,
23+
2124
/// Reuse the `mio::Events` value across calls to poll.
22-
events: mio::Events,
25+
events: Option<mio::Events>,
26+
27+
/// Primary slab handle containing the state for each resource registered
28+
/// with this driver.
29+
resources: Slab<ScheduledIo>,
2330

2431
/// State shared between the reactor and the handles.
2532
inner: Arc<Inner>,
@@ -37,11 +44,8 @@ pub(super) struct Inner {
3744
/// The underlying system event queue.
3845
io: mio::Poll,
3946

40-
/// Dispatch slabs for I/O and futures events
41-
pub(super) io_dispatch: Slab<ScheduledIo>,
42-
43-
/// The number of sources in `io_dispatch`.
44-
n_sources: AtomicUsize,
47+
/// Allocates `ScheduledIo` handles when creating new resources.
48+
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
4549

4650
/// Used to wake up the reactor from a call to `turn`
4751
wakeup: mio::SetReadiness,
@@ -53,7 +57,19 @@ pub(super) enum Direction {
5357
Write,
5458
}
5559

56-
const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
60+
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
61+
// token.
62+
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
63+
64+
const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
65+
66+
// Packs the generation value in the `readiness` field.
67+
//
68+
// The generation prevents a race condition where a slab slot is reused for a
69+
// new socket while the I/O driver is about to apply a readiness event. The
70+
// generaton value is checked when setting new readiness. If the generation do
71+
// not match, then the readiness event is discarded.
72+
const GENERATION: bit::Pack = ADDRESS.then(7);
5773

5874
fn _assert_kinds() {
5975
fn _assert<T: Send + Sync>() {}
@@ -69,6 +85,8 @@ impl Driver {
6985
pub(crate) fn new() -> io::Result<Driver> {
7086
let io = mio::Poll::new()?;
7187
let wakeup_pair = mio::Registration::new2();
88+
let slab = Slab::new();
89+
let allocator = slab.allocator();
7290

7391
io.register(
7492
&wakeup_pair.0,
@@ -78,12 +96,13 @@ impl Driver {
7896
)?;
7997

8098
Ok(Driver {
81-
events: mio::Events::with_capacity(1024),
99+
tick: 0,
100+
events: Some(mio::Events::with_capacity(1024)),
101+
resources: slab,
82102
_wakeup_registration: wakeup_pair.0,
83103
inner: Arc::new(Inner {
84104
io,
85-
io_dispatch: Slab::new(),
86-
n_sources: AtomicUsize::new(0),
105+
io_dispatch: allocator,
87106
wakeup: wakeup_pair.1,
88107
}),
89108
})
@@ -102,16 +121,27 @@ impl Driver {
102121
}
103122

104123
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
124+
// How often to call `compact()` on the resource slab
125+
const COMPACT_INTERVAL: u16 = 256;
126+
127+
self.tick = self.tick.wrapping_add(1);
128+
129+
if self.tick % COMPACT_INTERVAL == 0 {
130+
self.resources.compact();
131+
}
132+
133+
let mut events = self.events.take().expect("i/o driver event store missing");
134+
105135
// Block waiting for an event to happen, peeling out how many events
106136
// happened.
107-
match self.inner.io.poll(&mut self.events, max_wait) {
137+
match self.inner.io.poll(&mut events, max_wait) {
108138
Ok(_) => {}
109139
Err(e) => return Err(e),
110140
}
111141

112142
// Process all the events that came in, dispatching appropriately
113143

114-
for event in self.events.iter() {
144+
for event in events.iter() {
115145
let token = event.token();
116146

117147
if token == TOKEN_WAKEUP {
@@ -124,22 +154,24 @@ impl Driver {
124154
}
125155
}
126156

157+
self.events = Some(events);
158+
127159
Ok(())
128160
}
129161

130-
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
162+
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
131163
let mut rd = None;
132164
let mut wr = None;
133165

134-
let address = Address::from_usize(token.0);
166+
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
135167

136-
let io = match self.inner.io_dispatch.get(address) {
168+
let io = match self.resources.get(addr) {
137169
Some(io) => io,
138170
None => return,
139171
};
140172

141173
if io
142-
.set_readiness(address, |curr| curr | ready.as_usize())
174+
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
143175
.is_err()
144176
{
145177
// token no longer valid!
@@ -164,6 +196,18 @@ impl Driver {
164196
}
165197
}
166198

199+
impl Drop for Driver {
200+
fn drop(&mut self) {
201+
self.resources.for_each(|io| {
202+
// If a task is waiting on the I/O resource, notify it. The task
203+
// will then attempt to use the I/O resource and fail due to the
204+
// driver being shutdown.
205+
io.reader.wake();
206+
io.writer.wake();
207+
})
208+
}
209+
}
210+
167211
impl Park for Driver {
168212
type Unpark = Handle;
169213
type Error = io::Error;
@@ -246,46 +290,32 @@ impl Inner {
246290
&self,
247291
source: &dyn Evented,
248292
ready: mio::Ready,
249-
) -> io::Result<Address> {
250-
let address = self.io_dispatch.alloc().ok_or_else(|| {
293+
) -> io::Result<slab::Ref<ScheduledIo>> {
294+
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
251295
io::Error::new(
252296
io::ErrorKind::Other,
253297
"reactor at max registered I/O resources",
254298
)
255299
})?;
256300

257-
self.n_sources.fetch_add(1, SeqCst);
301+
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
258302

259-
self.io.register(
260-
source,
261-
mio::Token(address.to_usize()),
262-
ready,
263-
mio::PollOpt::edge(),
264-
)?;
303+
self.io
304+
.register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
265305

266-
Ok(address)
306+
Ok(shared)
267307
}
268308

269309
/// Deregisters an I/O resource from the reactor.
270310
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
271311
self.io.deregister(source)
272312
}
273313

274-
pub(super) fn drop_source(&self, address: Address) {
275-
self.io_dispatch.remove(address);
276-
self.n_sources.fetch_sub(1, SeqCst);
277-
}
278-
279314
/// Registers interest in the I/O resource associated with `token`.
280-
pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
281-
let sched = self
282-
.io_dispatch
283-
.get(token)
284-
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
285-
315+
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
286316
let waker = match dir {
287-
Direction::Read => &sched.reader,
288-
Direction::Write => &sched.writer,
317+
Direction::Read => &io.reader,
318+
Direction::Write => &io.writer,
289319
};
290320

291321
waker.register(w);
@@ -303,100 +333,3 @@ impl Direction {
303333
}
304334
}
305335
}
306-
307-
#[cfg(all(test, loom))]
308-
mod tests {
309-
use super::*;
310-
use loom::thread;
311-
312-
// No-op `Evented` impl just so we can have something to pass to `add_source`.
313-
struct NotEvented;
314-
315-
impl Evented for NotEvented {
316-
fn register(
317-
&self,
318-
_: &mio::Poll,
319-
_: mio::Token,
320-
_: mio::Ready,
321-
_: mio::PollOpt,
322-
) -> io::Result<()> {
323-
Ok(())
324-
}
325-
326-
fn reregister(
327-
&self,
328-
_: &mio::Poll,
329-
_: mio::Token,
330-
_: mio::Ready,
331-
_: mio::PollOpt,
332-
) -> io::Result<()> {
333-
Ok(())
334-
}
335-
336-
fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
337-
Ok(())
338-
}
339-
}
340-
341-
#[test]
342-
fn tokens_unique_when_dropped() {
343-
loom::model(|| {
344-
let reactor = Driver::new().unwrap();
345-
let inner = reactor.inner;
346-
let inner2 = inner.clone();
347-
348-
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
349-
let thread = thread::spawn(move || {
350-
inner2.drop_source(token_1);
351-
});
352-
353-
let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
354-
thread.join().unwrap();
355-
356-
assert!(token_1 != token_2);
357-
})
358-
}
359-
360-
#[test]
361-
fn tokens_unique_when_dropped_on_full_page() {
362-
loom::model(|| {
363-
let reactor = Driver::new().unwrap();
364-
let inner = reactor.inner;
365-
let inner2 = inner.clone();
366-
// add sources to fill up the first page so that the dropped index
367-
// may be reused.
368-
for _ in 0..31 {
369-
inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
370-
}
371-
372-
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
373-
let thread = thread::spawn(move || {
374-
inner2.drop_source(token_1);
375-
});
376-
377-
let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
378-
thread.join().unwrap();
379-
380-
assert!(token_1 != token_2);
381-
})
382-
}
383-
384-
#[test]
385-
fn tokens_unique_concurrent_add() {
386-
loom::model(|| {
387-
let reactor = Driver::new().unwrap();
388-
let inner = reactor.inner;
389-
let inner2 = inner.clone();
390-
391-
let thread = thread::spawn(move || {
392-
let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
393-
token_2
394-
});
395-
396-
let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
397-
let token_2 = thread.join().unwrap();
398-
399-
assert!(token_1 != token_2);
400-
})
401-
}
402-
}

0 commit comments

Comments
 (0)