Skip to content

Commit 41850c8

Browse files
alessandroddjc
authored andcommitted
StreamState: allow reusing Recv instances
Recv is used to receive data during the lifetime of a stream. When a stream is closed, instead of throwing away the corresponding Recv, put it in a free list so they can be reused. This reduces allocations by avoiding repeated allocations of BinaryHeaps within Assemblers, and allocations of Box(es) for the actual Recvs. For servers that serve millions of streams, this drastically reduces allocations, effectively allocating a max_(uni|bi)_streams window worth of Recvs and then reusing them instead of allocating for each new stream.
1 parent 459322b commit 41850c8

File tree

4 files changed

+124
-24
lines changed

4 files changed

+124
-24
lines changed

quinn-proto/src/connection/assembler.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ impl Assembler {
2929
Self::default()
3030
}
3131

32+
/// Reset to the initial state
33+
pub(super) fn reinit(&mut self) {
34+
let old_data = mem::take(&mut self.data);
35+
*self = Self::default();
36+
self.data = old_data;
37+
self.data.clear();
38+
}
39+
3240
pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
3341
if ordered && !self.state.is_ordered() {
3442
return Err(IllegalOrderedRead);

quinn-proto/src/connection/streams/mod.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ use bytes::Bytes;
77
use thiserror::Error;
88
use tracing::trace;
99

10-
use self::state::get_or_insert_recv;
11-
1210
use super::spaces::{Retransmits, ThinRetransmits};
13-
use crate::{connection::streams::state::get_or_insert_send, frame, Dir, StreamId, VarInt};
11+
use crate::{
12+
connection::streams::state::{get_or_insert_recv, get_or_insert_send},
13+
frame, Dir, StreamId, VarInt,
14+
};
1415

1516
mod recv;
1617
use recv::Recv;
@@ -149,8 +150,8 @@ impl<'a> RecvStream<'a> {
149150
// connection-level flow control to account for discarded data. Otherwise, we can discard
150151
// state immediately.
151152
if !stream.final_offset_unknown() {
152-
entry.remove();
153-
self.state.stream_freed(self.id, StreamHalf::Recv);
153+
let recv = entry.remove().expect("must have recv when stopping");
154+
self.state.stream_recv_freed(self.id, recv);
154155
}
155156

156157
if self.state.add_read_credits(read_credits).should_transmit() {
@@ -168,7 +169,7 @@ impl<'a> RecvStream<'a> {
168169
let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
169170
return Err(ClosedStream { _private: () });
170171
};
171-
let Some(s) = entry.get().as_ref() else {
172+
let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
172173
return Ok(None);
173174
};
174175
if s.stopped {
@@ -180,8 +181,9 @@ impl<'a> RecvStream<'a> {
180181

181182
// Clean up state after application observes the reset, since there's no reason for the
182183
// application to attempt to read or stop the stream once it knows it's reset
183-
entry.remove_entry();
184-
self.state.stream_freed(self.id, StreamHalf::Recv);
184+
let (_, recv) = entry.remove_entry();
185+
self.state
186+
.stream_recv_freed(self.id, recv.expect("must have recv on reset"));
185187
self.state.queue_max_stream_id(self.pending);
186188

187189
Ok(Some(code))

quinn-proto/src/connection/streams/recv.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ use thiserror::Error;
55
use tracing::debug;
66

77
use super::state::get_or_insert_recv;
8-
use super::{ClosedStream, Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState};
8+
use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
99
use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
10+
use crate::connection::streams::state::StreamRecv;
1011
use crate::{frame, TransportError, VarInt};
1112

1213
#[derive(Debug, Default)]
1314
pub(super) struct Recv {
15+
// NB: when adding or removing fields, remember to update `reinit`.
1416
state: RecvState,
1517
pub(super) assembler: Assembler,
1618
sent_max_stream_data: u64,
@@ -29,6 +31,15 @@ impl Recv {
2931
})
3032
}
3133

34+
/// Reset to the initial state
35+
pub(super) fn reinit(&mut self, initial_max_data: u64) {
36+
self.state = RecvState::default();
37+
self.assembler.reinit();
38+
self.sent_max_stream_data = initial_max_data;
39+
self.end = 0;
40+
self.stopped = false;
41+
}
42+
3243
/// Process a STREAM frame
3344
///
3445
/// Return value is `(number_of_new_bytes_ingested, stream_is_closed)`
@@ -255,7 +266,7 @@ impl<'a> Chunks<'a> {
255266
let mut recv =
256267
match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
257268
true => return Err(ReadableError::ClosedStream),
258-
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
269+
false => entry.remove().unwrap().into_inner(), // this can't fail due to the previous get_or_insert_with
259270
};
260271

261272
recv.assembler.ensure_ordering(ordered)?;
@@ -292,14 +303,24 @@ impl<'a> Chunks<'a> {
292303
match rs.state {
293304
RecvState::ResetRecvd { error_code, .. } => {
294305
debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
295-
self.streams.stream_freed(self.id, StreamHalf::Recv);
296-
self.state = ChunksState::Reset(error_code);
306+
let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
307+
// At this point if we have `rs` self.state must be `ChunksState::Readable`
308+
let recv = match state {
309+
ChunksState::Readable(recv) => StreamRecv::Open(recv),
310+
_ => unreachable!("state must be ChunkState::Readable"),
311+
};
312+
self.streams.stream_recv_freed(self.id, recv);
297313
Err(ReadError::Reset(error_code))
298314
}
299315
RecvState::Recv { size } => {
300316
if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
301-
self.streams.stream_freed(self.id, StreamHalf::Recv);
302-
self.state = ChunksState::Finished;
317+
let state = mem::replace(&mut self.state, ChunksState::Finished);
318+
// At this point if we have `rs` self.state must be `ChunksState::Readable`
319+
let recv = match state {
320+
ChunksState::Readable(recv) => StreamRecv::Open(recv),
321+
_ => unreachable!("state must be ChunkState::Readable"),
322+
};
323+
self.streams.stream_recv_freed(self.id, recv);
303324
Ok(None)
304325
} else {
305326
// We don't need a distinct `ChunksState` variant for a blocked stream because
@@ -345,7 +366,9 @@ impl<'a> Chunks<'a> {
345366
self.pending.max_stream_data.insert(self.id);
346367
}
347368
// Return the stream to storage for future use
348-
self.streams.recv.insert(self.id, Some(rs));
369+
self.streams
370+
.recv
371+
.insert(self.id, Some(StreamRecv::Open(rs)));
349372
}
350373

351374
// Issue connection-level flow control credit for any data we read regardless of state

quinn-proto/src/connection/streams/state.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,58 @@ use crate::{
2020
Dir, Side, StreamId, TransportError, VarInt, MAX_STREAM_COUNT,
2121
};
2222

23+
/// Wrapper around `Recv` that facilitates reusing `Recv` instances
24+
#[derive(Debug)]
25+
pub(super) enum StreamRecv {
26+
/// A `Recv` that is ready to be opened
27+
Free(Box<Recv>),
28+
/// A `Recv` that has been opened
29+
Open(Box<Recv>),
30+
}
31+
32+
impl StreamRecv {
33+
/// Returns a reference to the inner `Recv` if the stream is open
34+
pub(super) fn as_open_recv(&self) -> Option<&Recv> {
35+
match self {
36+
Self::Open(r) => Some(r),
37+
_ => None,
38+
}
39+
}
40+
41+
// Returns a mutable reference to the inner `Recv` if the stream is open
42+
pub(super) fn as_open_recv_mut(&mut self) -> Option<&mut Recv> {
43+
match self {
44+
Self::Open(r) => Some(r),
45+
_ => None,
46+
}
47+
}
48+
49+
// Returns the inner `Recv`
50+
pub(super) fn into_inner(self) -> Box<Recv> {
51+
match self {
52+
Self::Free(r) | Self::Open(r) => r,
53+
}
54+
}
55+
56+
// Reinitialize the stream so the inner `Recv` can be reused
57+
pub(super) fn free(self, initial_max_data: u64) -> Self {
58+
match self {
59+
Self::Free(_) => unreachable!("Self::Free on reinit()"),
60+
Self::Open(mut recv) => {
61+
recv.reinit(initial_max_data);
62+
Self::Free(recv)
63+
}
64+
}
65+
}
66+
}
67+
2368
#[allow(unreachable_pub)] // fuzzing only
2469
pub struct StreamsState {
2570
pub(super) side: Side,
2671
// Set of streams that are currently open, or could be immediately opened by the peer
2772
pub(super) send: FxHashMap<StreamId, Option<Box<Send>>>,
28-
pub(super) recv: FxHashMap<StreamId, Option<Box<Recv>>>,
73+
pub(super) recv: FxHashMap<StreamId, Option<StreamRecv>>,
74+
pub(super) free_recv: Vec<StreamRecv>,
2975
pub(super) next: [u64; 2],
3076
/// Maximum number of locally-initiated streams that may be opened over the lifetime of the
3177
/// connection so far, per direction
@@ -105,6 +151,7 @@ impl StreamsState {
105151
side,
106152
send: FxHashMap::default(),
107153
recv: FxHashMap::default(),
154+
free_recv: Vec::new(),
108155
next: [0, 0],
109156
max: [0, 0],
110157
max_remote: [max_remote_bi.into(), max_remote_uni.into()],
@@ -240,8 +287,8 @@ impl StreamsState {
240287

241288
// Stopped streams become closed instantly on FIN, so check whether we need to clean up
242289
if closed {
243-
self.recv.remove(&id);
244-
self.stream_freed(id, StreamHalf::Recv);
290+
let rs = self.recv.remove(&id).flatten().unwrap();
291+
self.stream_recv_freed(id, rs);
245292
}
246293

247294
// We don't buffer data on stopped streams, so issue flow control credit immediately
@@ -293,8 +340,8 @@ impl StreamsState {
293340
let end = rs.end;
294341
if stopped {
295342
// Stopped streams should be disposed immediately on reset
296-
self.recv.remove(&id);
297-
self.stream_freed(id, StreamHalf::Recv);
343+
let rs = self.recv.remove(&id).flatten().unwrap();
344+
self.stream_recv_freed(id, rs);
298345
}
299346
self.on_stream_frame(!stopped, id);
300347

@@ -358,6 +405,7 @@ impl StreamsState {
358405
self.recv
359406
.get(&id)
360407
.and_then(|s| s.as_ref())
408+
.and_then(|s| s.as_open_recv())
361409
.map_or(false, |s| s.can_send_flow_control())
362410
}
363411

@@ -442,7 +490,12 @@ impl StreamsState {
442490
None => break,
443491
};
444492
pending.max_stream_data.remove(&id);
445-
let rs = match self.recv.get_mut(&id).and_then(|s| s.as_mut()) {
493+
let rs = match self
494+
.recv
495+
.get_mut(&id)
496+
.and_then(|s| s.as_mut())
497+
.and_then(|s| s.as_open_recv_mut())
498+
{
446499
Some(x) => x,
447500
None => continue,
448501
};
@@ -831,7 +884,8 @@ impl StreamsState {
831884
}
832885
// bidirectional OR (unidirectional AND remote)
833886
if bi || remote {
834-
assert!(self.recv.insert(id, None).is_none());
887+
let recv = self.free_recv.pop();
888+
assert!(self.recv.insert(id, recv).is_none());
835889
}
836890
}
837891

@@ -883,6 +937,11 @@ impl StreamsState {
883937
}
884938
}
885939

940+
pub(super) fn stream_recv_freed(&mut self, id: StreamId, recv: StreamRecv) {
941+
self.free_recv.push(recv.free(self.stream_receive_window));
942+
self.stream_freed(id, StreamHalf::Recv);
943+
}
944+
886945
pub(super) fn max_send_data(&self, id: StreamId) -> VarInt {
887946
let remote = self.side != id.initiator();
888947
match id.dir() {
@@ -905,8 +964,16 @@ pub(super) fn get_or_insert_send(
905964
#[inline]
906965
pub(super) fn get_or_insert_recv(
907966
initial_max_data: u64,
908-
) -> impl Fn(&mut Option<Box<Recv>>) -> &mut Box<Recv> {
909-
move |opt| opt.get_or_insert_with(|| Recv::new(initial_max_data))
967+
) -> impl FnMut(&mut Option<StreamRecv>) -> &mut Recv {
968+
move |opt| {
969+
*opt = opt.take().map(|s| match s {
970+
StreamRecv::Free(recv) => StreamRecv::Open(recv),
971+
s => s,
972+
});
973+
opt.get_or_insert_with(|| StreamRecv::Open(Recv::new(initial_max_data)))
974+
.as_open_recv_mut()
975+
.unwrap()
976+
}
910977
}
911978

912979
#[cfg(test)]

0 commit comments

Comments
 (0)