Skip to content

Commit b7a3df5

Browse files
opt(s2n-quic-dc): skip epoll registration in happy path
Currently, dcQUIC streams over TCP will be accepted, be registered with epoll, attempt reading (usually fails), in <1ms the first data packet arrives and we succeed reading, deregister the socket, and then hand off the stream to the application for further reading. We'd like to avoid the epoll registration as it uses extra CPU (even if latency impact is minimal) so this patch uses the Linux-only TCP_DEFER_ACCEPT to only accept sockets with data already available. That's combined with lazy registration of sockets with Tokio's epoll by only doing so if we get WouldBlock after attempting a read or write. The net effect is a 8.8% (relative) drop in overall CPU usage in one of our internal benchmarks which exercises short streams over loopback, bringing CPU usage in the acceptor from 23% of the workload to 18%.
1 parent 4d3d94f commit b7a3df5

File tree

8 files changed

+210
-26
lines changed

8 files changed

+210
-26
lines changed

dc/s2n-quic-dc/src/stream/environment/tokio/tcp.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crate::stream::server::tokio::tcp::LazyBoundStream;
45
use crate::{
56
event,
67
stream::{
@@ -51,7 +52,7 @@ where
5152

5253
/// A socket that should be reregistered with the application runtime
5354
pub struct Reregistered {
54-
pub socket: TcpStream,
55+
pub socket: LazyBoundStream,
5556
pub peer_addr: SocketAddress,
5657
pub local_port: u16,
5758
pub recv_buffer: RecvBuffer,

dc/s2n-quic-dc/src/stream/server/tokio.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
468468
self.server.local_addr = socket.local_addr()?;
469469
}
470470

471-
let socket = tokio::net::TcpListener::from_std(socket)?;
471+
let socket = tokio::io::unix::AsyncFd::new(socket)?;
472472
let id = self.id();
473473

474474
let acceptor = tcp::Acceptor::new(
@@ -480,7 +480,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
480480
self.backlog,
481481
self.accept_flavor,
482482
self.linger,
483-
)
483+
)?
484484
.run();
485485

486486
if self.span.is_disabled() {

dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,24 @@ use crate::{
1111
};
1212
use core::{future::poll_fn, task::Poll};
1313
use s2n_quic_core::{inet::SocketAddress, time::Clock};
14-
use std::time::Duration;
15-
use tokio::net::TcpListener;
14+
use std::net::TcpListener;
15+
use std::{os::fd::AsRawFd, time::Duration};
16+
use tokio::io::unix::AsyncFd;
1617
use tracing::debug;
1718

1819
mod fresh;
20+
mod lazy;
1921
mod manager;
2022
mod worker;
2123

24+
pub(crate) use lazy::LazyBoundStream;
25+
2226
pub struct Acceptor<Sub>
2327
where
2428
Sub: Subscriber + Clone,
2529
{
2630
sender: accept::Sender<Sub>,
27-
socket: TcpListener,
31+
socket: AsyncFd<TcpListener>,
2832
env: Environment<Sub>,
2933
secrets: secret::Map,
3034
backlog: usize,
@@ -39,14 +43,14 @@ where
3943
#[inline]
4044
pub fn new(
4145
id: usize,
42-
socket: TcpListener,
46+
socket: AsyncFd<TcpListener>,
4347
sender: &accept::Sender<Sub>,
4448
env: &Environment<Sub>,
4549
secrets: &secret::Map,
4650
backlog: usize,
4751
accept_flavor: accept::Flavor,
4852
linger: Option<Duration>,
49-
) -> Self {
53+
) -> std::io::Result<Self> {
5054
let acceptor = Self {
5155
sender: sender.clone(),
5256
socket,
@@ -57,7 +61,27 @@ where
5761
linger,
5862
};
5963

60-
if let Ok(addr) = acceptor.socket.local_addr() {
64+
#[cfg(target_os = "linux")]
65+
{
66+
let res = unsafe {
67+
libc::setsockopt(
68+
acceptor.socket.get_ref().as_raw_fd(),
69+
libc::SOL_TCP,
70+
libc::TCP_DEFER_ACCEPT,
71+
// This is how many seconds elapse before the kernel will accept a stream
72+
// without any data and return it to userspace. Any number of seconds is
73+
// arguably too many in our domain (we'd expect data in milliseconds) but in
74+
// practice this value shouldn't matter much.
75+
&1u32 as *const _ as *const _,
76+
std::mem::size_of::<u32>() as libc::socklen_t,
77+
)
78+
};
79+
if res != 0 {
80+
return Err(std::io::Error::last_os_error());
81+
}
82+
}
83+
84+
if let Ok(addr) = acceptor.socket.get_ref().local_addr() {
6185
let local_address: SocketAddress = addr.into();
6286
acceptor.env.endpoint_publisher().on_acceptor_tcp_started(
6387
event::builder::AcceptorTcpStarted {
@@ -68,7 +92,7 @@ where
6892
);
6993
}
7094

71-
acceptor
95+
Ok(acceptor)
7296
}
7397

7498
pub async fn run(mut self) {
@@ -103,7 +127,7 @@ where
103127

104128
workers.insert(
105129
remote_address,
106-
socket,
130+
LazyBoundStream::Std(socket),
107131
self.linger,
108132
&mut context,
109133
subscriber_ctx,

dc/s2n-quic-dc/src/stream/server/tokio/tcp/fresh.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use crate::event::{self, EndpointPublisher};
55
use core::task::{Context, Poll};
66
use s2n_quic_core::inet::SocketAddress;
7+
use std::task::ready;
78
use std::{collections::VecDeque, io};
89

910
/// Converts the kernel's TCP FIFO accept queue to LIFO
@@ -107,13 +108,20 @@ pub trait Listener {
107108
fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(Self::Stream, SocketAddress)>>;
108109
}
109110

110-
impl Listener for tokio::net::TcpListener {
111-
type Stream = tokio::net::TcpStream;
111+
impl Listener for tokio::io::unix::AsyncFd<std::net::TcpListener> {
112+
type Stream = std::net::TcpStream;
112113

113114
#[inline]
114115
fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(Self::Stream, SocketAddress)>> {
115-
(*self)
116-
.poll_accept(cx)
117-
.map_ok(|(socket, remote_address)| (socket, remote_address.into()))
116+
loop {
117+
let mut guard = ready!(self.poll_read_ready(cx))?;
118+
let (socket, remote_addr) = match guard.try_io(|listener| listener.get_ref().accept()) {
119+
Ok(v) => v?,
120+
// arm the waker via poll_read_ready if WouldBlock returned.
121+
Err(_) => continue,
122+
};
123+
socket.set_nonblocking(true)?;
124+
return Poll::Ready(Ok((socket, remote_addr.into())));
125+
}
118126
}
119127
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::msg;
5+
use crate::stream::socket::{fd::tcp, Flags, Socket};
6+
use s2n_quic_core::inet::ExplicitCongestionNotification;
7+
use std::io::{self, Write};
8+
use std::os::fd::AsRawFd;
9+
use std::pin::Pin;
10+
use std::task::Poll;
11+
use std::time::Duration;
12+
use std::{io::ErrorKind, net::TcpStream as StdTcpStream};
13+
use tokio::{io::AsyncWrite as _, net::TcpStream as TokioTcpStream};
14+
15+
pub enum LazyBoundStream {
16+
Tokio(TokioTcpStream),
17+
Std(StdTcpStream),
18+
// needed for moving between the previous two while only having &mut access.
19+
TempEmpty,
20+
}
21+
22+
impl LazyBoundStream {
23+
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
24+
match self {
25+
LazyBoundStream::Tokio(s) => s.set_nodelay(nodelay),
26+
LazyBoundStream::Std(s) => s.set_nodelay(nodelay),
27+
LazyBoundStream::TempEmpty => unreachable!(),
28+
}
29+
}
30+
31+
pub fn set_linger(&self, linger: Option<Duration>) -> io::Result<()> {
32+
match self {
33+
LazyBoundStream::Tokio(s) => s.set_linger(linger),
34+
LazyBoundStream::Std(s) => {
35+
// Once it stabilizes we can switch to the std function
36+
// https://github.com/rust-lang/rust/issues/88494
37+
let res = unsafe {
38+
libc::setsockopt(
39+
s.as_raw_fd(),
40+
libc::SOL_SOCKET,
41+
libc::SO_LINGER,
42+
&libc::linger {
43+
l_onoff: linger.is_some() as libc::c_int,
44+
l_linger: linger.unwrap_or_default().as_secs() as libc::c_int,
45+
} as *const _ as *const _,
46+
std::mem::size_of::<libc::linger>() as libc::socklen_t,
47+
)
48+
};
49+
if res != 0 {
50+
return Err(std::io::Error::last_os_error());
51+
}
52+
53+
Ok(())
54+
}
55+
LazyBoundStream::TempEmpty => unreachable!(),
56+
}
57+
}
58+
59+
pub fn into_std(self) -> io::Result<StdTcpStream> {
60+
match self {
61+
LazyBoundStream::Tokio(s) => s.into_std(),
62+
LazyBoundStream::Std(s) => Ok(s),
63+
LazyBoundStream::TempEmpty => unreachable!(),
64+
}
65+
}
66+
67+
pub fn poll_write(
68+
&mut self,
69+
cx: &mut std::task::Context,
70+
buffer: &[u8],
71+
) -> std::task::Poll<io::Result<usize>> {
72+
loop {
73+
match self {
74+
LazyBoundStream::Tokio(stream) => return Pin::new(stream).poll_write(cx, buffer),
75+
LazyBoundStream::Std(stream) => match stream.write(buffer) {
76+
Ok(v) => return Poll::Ready(Ok(v)),
77+
Err(e) => {
78+
if e.kind() == ErrorKind::WouldBlock {
79+
let LazyBoundStream::Std(stream) =
80+
std::mem::replace(self, LazyBoundStream::TempEmpty)
81+
else {
82+
unreachable!();
83+
};
84+
*self = LazyBoundStream::Tokio(TokioTcpStream::from_std(stream)?);
85+
} else {
86+
return Poll::Ready(Err(e));
87+
}
88+
}
89+
},
90+
LazyBoundStream::TempEmpty => unreachable!(),
91+
}
92+
}
93+
}
94+
95+
pub fn poll_recv_buffer(
96+
&mut self,
97+
cx: &mut std::task::Context,
98+
buffer: &mut msg::recv::Message,
99+
) -> std::task::Poll<io::Result<usize>> {
100+
loop {
101+
match self {
102+
LazyBoundStream::Tokio(stream) => {
103+
return Pin::new(stream).poll_recv_buffer(cx, buffer)
104+
}
105+
LazyBoundStream::Std(stream) => {
106+
let res = buffer.recv_with(|_addr, cmsg, buffer| {
107+
loop {
108+
let flags = Flags::default();
109+
let res = tcp::recv(&*stream, buffer, flags);
110+
111+
match res {
112+
Ok(len) => {
113+
// we don't need ECN markings from TCP since it handles that logic for us
114+
cmsg.set_ecn(ExplicitCongestionNotification::NotEct);
115+
116+
// TCP doesn't have segments so just set it to 0 (which will indicate a single
117+
// stream of bytes)
118+
cmsg.set_segment_len(0);
119+
120+
return Ok(len);
121+
}
122+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
123+
// try the operation again if we were interrupted
124+
continue;
125+
}
126+
Err(err) => return Err(err),
127+
}
128+
}
129+
});
130+
match res {
131+
Ok(v) => return Poll::Ready(Ok(v)),
132+
Err(e) => {
133+
if e.kind() == ErrorKind::WouldBlock {
134+
let LazyBoundStream::Std(stream) =
135+
std::mem::replace(self, LazyBoundStream::TempEmpty)
136+
else {
137+
unreachable!();
138+
};
139+
*self = LazyBoundStream::Tokio(TokioTcpStream::from_std(stream)?);
140+
} else {
141+
return Poll::Ready(Err(e));
142+
}
143+
}
144+
}
145+
}
146+
LazyBoundStream::TempEmpty => unreachable!(),
147+
}
148+
}
149+
}
150+
}

dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ where
370370
}
371371
}
372372

373-
pub trait Worker {
373+
pub(crate) trait Worker {
374374
type Context;
375375
type ConnectionContext;
376376
type Stream;

dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::accept;
5+
use super::LazyBoundStream;
56
use crate::{
67
either::Either,
78
event::{self, EndpointPublisher, IntoEvent},
@@ -11,7 +12,6 @@ use crate::{
1112
endpoint,
1213
environment::tokio::{self as env, Environment},
1314
recv, server,
14-
socket::Socket,
1515
},
1616
};
1717
use core::{
@@ -27,7 +27,6 @@ use s2n_quic_core::{
2727
time::{Clock, Timestamp},
2828
};
2929
use std::io;
30-
use tokio::{io::AsyncWrite as _, net::TcpStream};
3130
use tracing::debug;
3231

3332
pub struct Context<Sub>
@@ -54,7 +53,7 @@ where
5453
env: acceptor.env.clone(),
5554
secrets: acceptor.secrets.clone(),
5655
accept_flavor: acceptor.accept_flavor,
57-
local_port: acceptor.socket.local_addr().unwrap().port(),
56+
local_port: acceptor.socket.get_ref().local_addr().unwrap().port(),
5857
}
5958
}
6059
}
@@ -64,7 +63,7 @@ where
6463
Sub: event::Subscriber + Clone,
6564
{
6665
queue_time: Timestamp,
67-
stream: Option<(TcpStream, SocketAddress)>,
66+
stream: Option<(LazyBoundStream, SocketAddress)>,
6867
subscriber_ctx: Option<Sub::ConnectionContext>,
6968
state: WorkerState,
7069
}
@@ -89,14 +88,14 @@ where
8988
Sub: event::Subscriber + Clone,
9089
{
9190
type ConnectionContext = Sub::ConnectionContext;
92-
type Stream = TcpStream;
91+
type Stream = LazyBoundStream;
9392
type Context = Context<Sub>;
9493

9594
#[inline]
9695
fn replace<Pub, C>(
9796
&mut self,
9897
remote_address: SocketAddress,
99-
stream: TcpStream,
98+
stream: LazyBoundStream,
10099
linger: Option<Duration>,
101100
subscriber_ctx: Self::ConnectionContext,
102101
publisher: &Pub,
@@ -232,7 +231,7 @@ impl WorkerState {
232231
&mut self,
233232
cx: &mut task::Context,
234233
context: &mut Context<Sub>,
235-
stream: &mut Option<(TcpStream, SocketAddress)>,
234+
stream: &mut Option<(LazyBoundStream, SocketAddress)>,
236235
subscriber_ctx: &mut Option<Sub::ConnectionContext>,
237236
queue_time: Timestamp,
238237
now: Timestamp,
@@ -396,7 +395,7 @@ impl WorkerState {
396395
#[inline]
397396
fn poll_initial_packet<Pub>(
398397
cx: &mut task::Context,
399-
stream: &mut TcpStream,
398+
stream: &mut LazyBoundStream,
400399
remote_address: &SocketAddress,
401400
recv_buffer: &mut msg::recv::Message,
402401
sojourn_time: Duration,

0 commit comments

Comments
 (0)