Skip to content

Commit ef234f5

Browse files
committed
Avoid allocating large buffer in the accept() loop
Delay it to later. This should help boosting how fast the server can accept now connections.
1 parent 97b951e commit ef234f5

File tree

4 files changed

+59
-37
lines changed

4 files changed

+59
-37
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
d698dfe945ece23e1fda528ade4ca3931951169b
1+
c9b07a00d15151e87fc0ff0ba020a20e2d74de2b

pingora-core/src/listeners/l4.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl ListenerEndpoint {
341341
Ok(())
342342
}
343343

344-
pub async fn accept(&mut self) -> Result<Stream> {
344+
pub async fn accept(&self) -> Result<Stream> {
345345
let mut stream = self
346346
.listener
347347
.accept()
@@ -365,10 +365,10 @@ mod test {
365365
builder.listen_addr(ServerAddress::Tcp(addr.into(), None));
366366

367367
#[cfg(unix)]
368-
let mut listener = builder.listen(None).await.unwrap();
368+
let listener = builder.listen(None).await.unwrap();
369369

370370
#[cfg(windows)]
371-
let mut listener = builder.listen().await.unwrap();
371+
let listener = builder.listen().await.unwrap();
372372

373373
tokio::spawn(async move {
374374
// just try to accept once
@@ -391,10 +391,10 @@ mod test {
391391
builder.listen_addr(ServerAddress::Tcp("[::]:7101".into(), sock_opt));
392392

393393
#[cfg(unix)]
394-
let mut listener = builder.listen(None).await.unwrap();
394+
let listener = builder.listen(None).await.unwrap();
395395

396396
#[cfg(windows)]
397-
let mut listener = builder.listen().await.unwrap();
397+
let listener = builder.listen().await.unwrap();
398398

399399
tokio::spawn(async move {
400400
// just try to accept twice
@@ -418,7 +418,7 @@ mod test {
418418

419419
builder.listen_addr(ServerAddress::Uds(addr.into(), None));
420420

421-
let mut listener = builder.listen(None).await.unwrap();
421+
let listener = builder.listen(None).await.unwrap();
422422

423423
tokio::spawn(async move {
424424
// just try to accept once

pingora-core/src/listeners/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl TransportStack {
9090
self.l4.as_str()
9191
}
9292

93-
pub async fn accept(&mut self) -> Result<UninitializedStream> {
93+
pub async fn accept(&self) -> Result<UninitializedStream> {
9494
let stream = self.l4.accept().await?;
9595
Ok(UninitializedStream {
9696
l4: stream,
@@ -109,7 +109,8 @@ pub(crate) struct UninitializedStream {
109109
}
110110

111111
impl UninitializedStream {
112-
pub async fn handshake(self) -> Result<Stream> {
112+
pub async fn handshake(mut self) -> Result<Stream> {
113+
self.l4.set_buffer();
113114
if let Some(tls) = self.tls {
114115
let tls_stream = tls.tls_handshake(self.l4).await?;
115116
Ok(Box::new(tls_stream))
@@ -248,7 +249,7 @@ mod test {
248249
.unwrap();
249250

250251
assert_eq!(listeners.len(), 2);
251-
for mut listener in listeners {
252+
for listener in listeners {
252253
tokio::spawn(async move {
253254
// just try to accept once
254255
let stream = listener.accept().await.unwrap();
@@ -272,7 +273,7 @@ mod test {
272273
let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR"));
273274
let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR"));
274275
let mut listeners = Listeners::tls(addr, &cert_path, &key_path).unwrap();
275-
let mut listener = listeners
276+
let listener = listeners
276277
.build(
277278
#[cfg(unix)]
278279
None,

pingora-core/src/protocols/l4/stream.rs

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,8 @@ const BUF_WRITE_SIZE: usize = 1460;
351351
/// A concrete type for transport layer connection + extra fields for logging
352352
#[derive(Debug)]
353353
pub struct Stream {
354-
stream: BufStream<RawStreamWrapper>,
354+
// Use `Option` to be able to swap to adjust the buffer size. Always safe to unwrap
355+
stream: Option<BufStream<RawStreamWrapper>>,
355356
// the data put back at the front of the read buffer, in order to replay the read
356357
rewind_read_buf: Vec<u8>,
357358
buffer_write: bool,
@@ -368,9 +369,17 @@ pub struct Stream {
368369
}
369370

370371
impl Stream {
372+
fn stream(&self) -> &BufStream<RawStreamWrapper> {
373+
self.stream.as_ref().expect("stream should always be set")
374+
}
375+
376+
fn stream_mut(&mut self) -> &mut BufStream<RawStreamWrapper> {
377+
self.stream.as_mut().expect("stream should always be set")
378+
}
379+
371380
/// set TCP nodelay for this connection if `self` is TCP
372381
pub fn set_nodelay(&mut self) -> Result<()> {
373-
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
382+
if let RawStream::Tcp(s) = &self.stream_mut().get_mut().stream {
374383
s.set_nodelay(true)
375384
.or_err(ConnectError, "failed to set_nodelay")?;
376385
}
@@ -379,7 +388,7 @@ impl Stream {
379388

380389
/// set TCP keepalive settings for this connection if `self` is TCP
381390
pub fn set_keepalive(&mut self, ka: &TcpKeepalive) -> Result<()> {
382-
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
391+
if let RawStream::Tcp(s) = &self.stream_mut().get_mut().stream {
383392
debug!("Setting tcp keepalive");
384393
set_tcp_keepalive(s, ka)?;
385394
}
@@ -390,12 +399,12 @@ impl Stream {
390399
pub fn set_rx_timestamp(&mut self) -> Result<()> {
391400
use nix::sys::socket::{setsockopt, sockopt, TimestampingFlag};
392401

393-
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
402+
if let RawStream::Tcp(s) = &self.stream_mut().get_mut().stream {
394403
let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
395404
| TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
396405
setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
397406
.or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?;
398-
self.stream.get_mut().enable_rx_ts(true);
407+
self.stream_mut().get_mut().enable_rx_ts(true);
399408
}
400409

401410
Ok(())
@@ -412,16 +421,28 @@ impl Stream {
412421
self.rewind_read_buf.extend_from_slice(data);
413422
}
414423
}
424+
425+
/// Set the buffer of BufStream
426+
/// It is only set later because of the malloc overhead in critical accept() path
427+
pub(crate) fn set_buffer(&mut self) {
428+
use std::mem;
429+
// Since BufStream doesn't provide an API to adjust the buf directly,
430+
// we take the raw stream out of it and put it in a new BufStream with the size we want
431+
let stream = mem::take(&mut self.stream);
432+
let stream =
433+
stream.map(|s| BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, s.into_inner()));
434+
let _ = mem::replace(&mut self.stream, stream);
435+
}
415436
}
416437

417438
impl From<TcpStream> for Stream {
418439
fn from(s: TcpStream) -> Self {
419440
Stream {
420-
stream: BufStream::with_capacity(
421-
BUF_READ_SIZE,
422-
BUF_WRITE_SIZE,
441+
stream: Some(BufStream::with_capacity(
442+
0,
443+
0,
423444
RawStreamWrapper::new(RawStream::Tcp(s)),
424-
),
445+
)),
425446
rewind_read_buf: Vec::new(),
426447
buffer_write: true,
427448
established_ts: SystemTime::now(),
@@ -439,11 +460,11 @@ impl From<TcpStream> for Stream {
439460
impl From<UnixStream> for Stream {
440461
fn from(s: UnixStream) -> Self {
441462
Stream {
442-
stream: BufStream::with_capacity(
443-
BUF_READ_SIZE,
444-
BUF_WRITE_SIZE,
463+
stream: Some(BufStream::with_capacity(
464+
0,
465+
0,
445466
RawStreamWrapper::new(RawStream::Unix(s)),
446-
),
467+
)),
447468
rewind_read_buf: Vec::new(),
448469
buffer_write: true,
449470
established_ts: SystemTime::now(),
@@ -460,14 +481,14 @@ impl From<UnixStream> for Stream {
460481
#[cfg(unix)]
461482
impl AsRawFd for Stream {
462483
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
463-
self.stream.get_ref().as_raw_fd()
484+
self.stream().get_ref().as_raw_fd()
464485
}
465486
}
466487

467488
#[cfg(windows)]
468489
impl AsRawSocket for Stream {
469490
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
470-
self.stream.get_ref().as_raw_socket()
491+
self.stream().get_ref().as_raw_socket()
471492
}
472493
}
473494

@@ -551,7 +572,7 @@ impl Drop for Stream {
551572
t.0.on_disconnected();
552573
}
553574
/* use nodelay/local_addr function to detect socket status */
554-
let ret = match &self.stream.get_ref().stream {
575+
let ret = match &self.stream().get_ref().stream {
555576
RawStream::Tcp(s) => s.nodelay().err(),
556577
#[cfg(unix)]
557578
RawStream::Unix(s) => s.local_addr().err(),
@@ -594,10 +615,10 @@ impl AsyncRead for Stream {
594615
let _ = std::mem::replace(&mut self.rewind_read_buf, remaining_buf);
595616
result
596617
} else {
597-
Pin::new(&mut self.stream).poll_read(cx, buf)
618+
Pin::new(&mut self.stream_mut()).poll_read(cx, buf)
598619
};
599620
self.read_pending_time.poll_time(&result);
600-
self.rx_ts = self.stream.get_ref().rx_ts;
621+
self.rx_ts = self.stream().get_ref().rx_ts;
601622
result
602623
}
603624
}
@@ -609,22 +630,22 @@ impl AsyncWrite for Stream {
609630
buf: &[u8],
610631
) -> Poll<io::Result<usize>> {
611632
let result = if self.buffer_write {
612-
Pin::new(&mut self.stream).poll_write(cx, buf)
633+
Pin::new(&mut self.stream_mut()).poll_write(cx, buf)
613634
} else {
614-
Pin::new(&mut self.stream.get_mut()).poll_write(cx, buf)
635+
Pin::new(&mut self.stream_mut().get_mut()).poll_write(cx, buf)
615636
};
616637
self.write_pending_time.poll_write_time(&result, buf.len());
617638
result
618639
}
619640

620641
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
621-
let result = Pin::new(&mut self.stream).poll_flush(cx);
642+
let result = Pin::new(&mut self.stream_mut()).poll_flush(cx);
622643
self.write_pending_time.poll_time(&result);
623644
result
624645
}
625646

626647
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
627-
Pin::new(&mut self.stream).poll_shutdown(cx)
648+
Pin::new(&mut self.stream_mut()).poll_shutdown(cx)
628649
}
629650

630651
fn poll_write_vectored(
@@ -635,9 +656,9 @@ impl AsyncWrite for Stream {
635656
let total_size = bufs.iter().fold(0, |acc, s| acc + s.len());
636657

637658
let result = if self.buffer_write {
638-
Pin::new(&mut self.stream).poll_write_vectored(cx, bufs)
659+
Pin::new(&mut self.stream_mut()).poll_write_vectored(cx, bufs)
639660
} else {
640-
Pin::new(&mut self.stream.get_mut()).poll_write_vectored(cx, bufs)
661+
Pin::new(&mut self.stream_mut().get_mut()).poll_write_vectored(cx, bufs)
641662
};
642663

643664
self.write_pending_time.poll_write_time(&result, total_size);
@@ -646,9 +667,9 @@ impl AsyncWrite for Stream {
646667

647668
fn is_write_vectored(&self) -> bool {
648669
if self.buffer_write {
649-
self.stream.is_write_vectored() // it is true
670+
self.stream().is_write_vectored() // it is true
650671
} else {
651-
self.stream.get_ref().is_write_vectored()
672+
self.stream().get_ref().is_write_vectored()
652673
}
653674
}
654675
}

0 commit comments

Comments
 (0)