Skip to content

Commit ff81604

Browse files
opt(s2n-quic-dc): skip epoll registration in happy path (#2754)
Currently, dcQUIC streams over TCP will be accepted, be registered with epoll, attempt reading (usually fails), in <1ms the first data packet arrives and we succeed reading, deregister the socket, and then hand off the stream to the application for further reading. We'd like to avoid the epoll registration as it uses extra CPU (even if latency impact is minimal) so this patch uses the Linux-only TCP_DEFER_ACCEPT to only accept sockets with data already available. That's combined with lazy registration of sockets with Tokio's epoll by only doing so if we get WouldBlock after attempting a read or write. The net effect is a 8.8% (relative) drop in overall CPU usage in one of our internal benchmarks which exercises short streams over loopback, bringing CPU usage in the acceptor from 23% of the workload to 18%.
1 parent bd9ee3e commit ff81604

File tree

9 files changed

+216
-30
lines changed

9 files changed

+216
-30
lines changed

dc/s2n-quic-dc/src/stream/environment/tokio/tcp.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
stream::{
77
environment::{tokio::Environment, Peer, SetupResult, SocketSet},
88
recv::shared::RecvBuffer,
9+
server::tokio::tcp::LazyBoundStream,
910
TransportFeatures,
1011
},
1112
};
@@ -51,7 +52,7 @@ where
5152

5253
/// A socket that should be reregistered with the application runtime
5354
pub struct Reregistered {
54-
pub socket: TcpStream,
55+
pub socket: LazyBoundStream,
5556
pub peer_addr: SocketAddress,
5657
pub local_port: u16,
5758
pub recv_buffer: RecvBuffer,

dc/s2n-quic-dc/src/stream/server/tokio.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
468468
self.server.local_addr = socket.local_addr()?;
469469
}
470470

471-
let socket = tokio::net::TcpListener::from_std(socket)?;
471+
let socket = tokio::io::unix::AsyncFd::new(socket)?;
472472
let id = self.id();
473473

474474
let acceptor = tcp::Acceptor::new(
@@ -480,7 +480,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
480480
self.backlog,
481481
self.accept_flavor,
482482
self.linger,
483-
)
483+
)?
484484
.run();
485485

486486
if self.span.is_disabled() {

dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,23 @@ use crate::{
1111
};
1212
use core::{future::poll_fn, task::Poll};
1313
use s2n_quic_core::{inet::SocketAddress, time::Clock};
14-
use std::time::Duration;
15-
use tokio::net::TcpListener;
14+
use std::{net::TcpListener, os::fd::AsRawFd, time::Duration};
15+
use tokio::io::unix::AsyncFd;
1616
use tracing::debug;
1717

1818
mod fresh;
19+
mod lazy;
1920
mod manager;
2021
mod worker;
2122

23+
pub(crate) use lazy::LazyBoundStream;
24+
2225
pub struct Acceptor<Sub>
2326
where
2427
Sub: Subscriber + Clone,
2528
{
2629
sender: accept::Sender<Sub>,
27-
socket: TcpListener,
30+
socket: AsyncFd<TcpListener>,
2831
env: Environment<Sub>,
2932
secrets: secret::Map,
3033
backlog: usize,
@@ -39,14 +42,14 @@ where
3942
#[inline]
4043
pub fn new(
4144
id: usize,
42-
socket: TcpListener,
45+
socket: AsyncFd<TcpListener>,
4346
sender: &accept::Sender<Sub>,
4447
env: &Environment<Sub>,
4548
secrets: &secret::Map,
4649
backlog: usize,
4750
accept_flavor: accept::Flavor,
4851
linger: Option<Duration>,
49-
) -> Self {
52+
) -> std::io::Result<Self> {
5053
let acceptor = Self {
5154
sender: sender.clone(),
5255
socket,
@@ -57,7 +60,27 @@ where
5760
linger,
5861
};
5962

60-
if let Ok(addr) = acceptor.socket.local_addr() {
63+
#[cfg(target_os = "linux")]
64+
{
65+
let res = unsafe {
66+
libc::setsockopt(
67+
acceptor.socket.get_ref().as_raw_fd(),
68+
libc::SOL_TCP,
69+
libc::TCP_DEFER_ACCEPT,
70+
// This is how many seconds elapse before the kernel will accept a stream
71+
// without any data and return it to userspace. Any number of seconds is
72+
// arguably too many in our domain (we'd expect data in milliseconds) but in
73+
// practice this value shouldn't matter much.
74+
&1u32 as *const _ as *const _,
75+
std::mem::size_of::<u32>() as libc::socklen_t,
76+
)
77+
};
78+
if res != 0 {
79+
return Err(std::io::Error::last_os_error());
80+
}
81+
}
82+
83+
if let Ok(addr) = acceptor.socket.get_ref().local_addr() {
6184
let local_address: SocketAddress = addr.into();
6285
acceptor.env.endpoint_publisher().on_acceptor_tcp_started(
6386
event::builder::AcceptorTcpStarted {
@@ -68,7 +91,7 @@ where
6891
);
6992
}
7093

71-
acceptor
94+
Ok(acceptor)
7295
}
7396

7497
pub async fn run(mut self) {
@@ -103,7 +126,7 @@ where
103126

104127
workers.insert(
105128
remote_address,
106-
socket,
129+
LazyBoundStream::Std(socket),
107130
self.linger,
108131
&mut context,
109132
subscriber_ctx,

dc/s2n-quic-dc/src/stream/server/tokio/tcp/fresh.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use crate::event::{self, EndpointPublisher};
55
use core::task::{Context, Poll};
66
use s2n_quic_core::inet::SocketAddress;
7-
use std::{collections::VecDeque, io};
7+
use std::{collections::VecDeque, io, task::ready};
88

99
/// Converts the kernel's TCP FIFO accept queue to LIFO
1010
///
@@ -107,13 +107,20 @@ pub trait Listener {
107107
fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(Self::Stream, SocketAddress)>>;
108108
}
109109

110-
impl Listener for tokio::net::TcpListener {
111-
type Stream = tokio::net::TcpStream;
110+
impl Listener for tokio::io::unix::AsyncFd<std::net::TcpListener> {
111+
type Stream = std::net::TcpStream;
112112

113113
#[inline]
114114
fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(Self::Stream, SocketAddress)>> {
115-
(*self)
116-
.poll_accept(cx)
117-
.map_ok(|(socket, remote_address)| (socket, remote_address.into()))
115+
loop {
116+
let mut guard = ready!(self.poll_read_ready(cx))?;
117+
let (socket, remote_addr) = match guard.try_io(|listener| listener.get_ref().accept()) {
118+
Ok(v) => v?,
119+
// arm the waker via poll_read_ready if WouldBlock returned.
120+
Err(_) => continue,
121+
};
122+
socket.set_nonblocking(true)?;
123+
return Poll::Ready(Ok((socket, remote_addr.into())));
124+
}
118125
}
119126
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{
5+
msg,
6+
stream::socket::{fd::tcp, Flags, Socket},
7+
};
8+
use s2n_quic_core::inet::ExplicitCongestionNotification;
9+
use std::{
10+
io::{self, ErrorKind, Write},
11+
net::TcpStream as StdTcpStream,
12+
os::fd::AsRawFd,
13+
pin::Pin,
14+
task::Poll,
15+
time::Duration,
16+
};
17+
use tokio::{io::AsyncWrite as _, net::TcpStream as TokioTcpStream};
18+
19+
pub enum LazyBoundStream {
20+
Tokio(TokioTcpStream),
21+
Std(StdTcpStream),
22+
// needed for moving between the previous two while only having &mut access.
23+
TempEmpty,
24+
}
25+
26+
impl LazyBoundStream {
27+
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
28+
match self {
29+
LazyBoundStream::Tokio(s) => s.set_nodelay(nodelay),
30+
LazyBoundStream::Std(s) => s.set_nodelay(nodelay),
31+
LazyBoundStream::TempEmpty => unreachable!(),
32+
}
33+
}
34+
35+
pub fn set_linger(&self, linger: Option<Duration>) -> io::Result<()> {
36+
match self {
37+
LazyBoundStream::Tokio(s) => s.set_linger(linger),
38+
LazyBoundStream::Std(s) => {
39+
// Once it stabilizes we can switch to the std function
40+
// https://github.com/rust-lang/rust/issues/88494
41+
let res = unsafe {
42+
libc::setsockopt(
43+
s.as_raw_fd(),
44+
libc::SOL_SOCKET,
45+
libc::SO_LINGER,
46+
&libc::linger {
47+
l_onoff: linger.is_some() as libc::c_int,
48+
l_linger: linger.unwrap_or_default().as_secs() as libc::c_int,
49+
} as *const _ as *const _,
50+
std::mem::size_of::<libc::linger>() as libc::socklen_t,
51+
)
52+
};
53+
if res != 0 {
54+
return Err(std::io::Error::last_os_error());
55+
}
56+
57+
Ok(())
58+
}
59+
LazyBoundStream::TempEmpty => unreachable!(),
60+
}
61+
}
62+
63+
pub fn into_std(self) -> io::Result<StdTcpStream> {
64+
match self {
65+
LazyBoundStream::Tokio(s) => s.into_std(),
66+
LazyBoundStream::Std(s) => Ok(s),
67+
LazyBoundStream::TempEmpty => unreachable!(),
68+
}
69+
}
70+
71+
pub fn poll_write(
72+
&mut self,
73+
cx: &mut std::task::Context,
74+
buffer: &[u8],
75+
) -> std::task::Poll<io::Result<usize>> {
76+
loop {
77+
match self {
78+
LazyBoundStream::Tokio(stream) => return Pin::new(stream).poll_write(cx, buffer),
79+
LazyBoundStream::Std(stream) => match stream.write(buffer) {
80+
Ok(v) => return Poll::Ready(Ok(v)),
81+
Err(e) => {
82+
if e.kind() == ErrorKind::WouldBlock {
83+
let LazyBoundStream::Std(stream) =
84+
std::mem::replace(self, LazyBoundStream::TempEmpty)
85+
else {
86+
unreachable!();
87+
};
88+
*self = LazyBoundStream::Tokio(TokioTcpStream::from_std(stream)?);
89+
} else {
90+
return Poll::Ready(Err(e));
91+
}
92+
}
93+
},
94+
LazyBoundStream::TempEmpty => unreachable!(),
95+
}
96+
}
97+
}
98+
99+
pub fn poll_recv_buffer(
100+
&mut self,
101+
cx: &mut std::task::Context,
102+
buffer: &mut msg::recv::Message,
103+
) -> std::task::Poll<io::Result<usize>> {
104+
loop {
105+
match self {
106+
LazyBoundStream::Tokio(stream) => {
107+
return Pin::new(stream).poll_recv_buffer(cx, buffer)
108+
}
109+
LazyBoundStream::Std(stream) => {
110+
let res = buffer.recv_with(|_addr, cmsg, buffer| {
111+
loop {
112+
let flags = Flags::default();
113+
let res = tcp::recv(&*stream, buffer, flags);
114+
115+
match res {
116+
Ok(len) => {
117+
// we don't need ECN markings from TCP since it handles that logic for us
118+
cmsg.set_ecn(ExplicitCongestionNotification::NotEct);
119+
120+
// TCP doesn't have segments so just set it to 0 (which will indicate a single
121+
// stream of bytes)
122+
cmsg.set_segment_len(0);
123+
124+
return Ok(len);
125+
}
126+
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
127+
// try the operation again if we were interrupted
128+
continue;
129+
}
130+
Err(err) => return Err(err),
131+
}
132+
}
133+
});
134+
match res {
135+
Ok(v) => return Poll::Ready(Ok(v)),
136+
Err(e) => {
137+
if e.kind() == ErrorKind::WouldBlock {
138+
let LazyBoundStream::Std(stream) =
139+
std::mem::replace(self, LazyBoundStream::TempEmpty)
140+
else {
141+
unreachable!();
142+
};
143+
*self = LazyBoundStream::Tokio(TokioTcpStream::from_std(stream)?);
144+
} else {
145+
return Poll::Ready(Err(e));
146+
}
147+
}
148+
}
149+
}
150+
LazyBoundStream::TempEmpty => unreachable!(),
151+
}
152+
}
153+
}
154+
}

dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ where
370370
}
371371
}
372372

373-
pub trait Worker {
373+
pub(crate) trait Worker {
374374
type Context;
375375
type ConnectionContext;
376376
type Stream;

0 commit comments

Comments
 (0)