Skip to content

Conversation

@zijiren233
Copy link

I wrote a simple test, but I won't put it in the source code because the benchmark framework hasn't been introduced yet.

use std::time::Instant;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

const BUFFER_SIZE: usize = 16 * 1024;
const TEST_DATA_SIZE: usize = 10 * 1024 * 1024 * 1024; // 100 MB
const NUM_ITERATIONS: usize = 10000;
const CONCURRENT_CONNECTIONS: usize = 1_000_000;

/// Stack-based version (current implementation)
async fn copy_io_stack<A, B>(a: &mut A, b: &mut B) -> (usize, usize, Option<std::io::Error>)
where
	A: AsyncRead + AsyncWrite + Unpin + ?Sized,
	B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
	let mut a2b = [0u8; BUFFER_SIZE];
	let mut b2a = [0u8; BUFFER_SIZE];

	let mut a2b_num = 0;
	let mut b2a_num = 0;
	let mut last_err = None;

	loop {
		tokio::select! {
			a2b_res = a.read(&mut a2b) => match a2b_res {
				Ok(num) => {
					if num == 0 {
						break;
					}
					a2b_num += num;
					if let Err(err) = b.write_all(&a2b[..num]).await {
						last_err = Some(err);
						break;
					}
				},
				Err(err) => {
					last_err = Some(err);
					break;
				}
			},
			b2a_res = b.read(&mut b2a) => match b2a_res {
				Ok(num) => {
					if num == 0 {
						break;
					}
					b2a_num += num;
					if let Err(err) = a.write_all(&b2a[..num]).await {
						last_err = Some(err);
						break;
					}
				},
				Err(err) => {
					last_err = Some(err);
					break;
				},
			}
		}
	}

	(a2b_num, b2a_num, last_err)
}

/// Box-based version (heap allocation - 2 separate allocations)
async fn copy_io_box<A, B>(a: &mut A, b: &mut B) -> (usize, usize, Option<std::io::Error>)
where
	A: AsyncRead + AsyncWrite + Unpin + ?Sized,
	B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
	let mut a2b = vec![0u8; BUFFER_SIZE].into_boxed_slice();
	let mut b2a = vec![0u8; BUFFER_SIZE].into_boxed_slice();

	let mut a2b_num = 0;
	let mut b2a_num = 0;
	let mut last_err = None;

	loop {
		tokio::select! {
			a2b_res = a.read(&mut a2b) => match a2b_res {
				Ok(num) => {
					if num == 0 {
						break;
					}
					a2b_num += num;
					if let Err(err) = b.write_all(&a2b[..num]).await {
						last_err = Some(err);
						break;
					}
				},
				Err(err) => {
					last_err = Some(err);
					break;
				}
			},
			b2a_res = b.read(&mut b2a) => match b2a_res {
				Ok(num) => {
					if num == 0 {
						break;
					}
					b2a_num += num;
					if let Err(err) = a.write_all(&b2a[..num]).await {
						last_err = Some(err);
						break;
					}
				},
				Err(err) => {
					last_err = Some(err);
					break;
				},
			}
		}
	}

	(a2b_num, b2a_num, last_err)
}

// Concurrent benchmark functions
async fn spawn_copy_stack() {
	let (mut client, mut server) = tokio::io::duplex(8 * 1024);

	tokio::spawn(async move {
		let _ = client.write_all(&[0u8; 1024]).await;
		let _ = client.shutdown().await;
	});

	let mut dummy = tokio::io::empty();
	let _ = copy_io_stack(&mut server, &mut dummy).await;
}

async fn spawn_copy_box() {
	let (mut client, mut server) = tokio::io::duplex(8 * 1024);

	tokio::spawn(async move {
		let _ = client.write_all(&[0u8; 1024]).await;
		let _ = client.shutdown().await;
	});

	let mut dummy = tokio::io::empty();
	let _ = copy_io_box(&mut server, &mut dummy).await;
}

async fn benchmark_concurrent<F, Fut>(name: &str, spawn_fn: F) -> std::time::Duration
where
	F: Fn() -> Fut,
	Fut: std::future::Future<Output = ()> + Send + 'static,
{
	let start = Instant::now();

	let mut handles = Vec::with_capacity(CONCURRENT_CONNECTIONS);

	for _ in 0..CONCURRENT_CONNECTIONS {
		let fut = spawn_fn();
		handles.push(tokio::spawn(fut));
	}

	// Wait for all tasks to complete
	for handle in handles {
		let _ = handle.await;
	}

	let duration = start.elapsed();
	println!("  [{name}] {CONCURRENT_CONNECTIONS} concurrent connections completed in {duration:?}");
	duration
}

#[tokio::main]
async fn main() {
	println!("=== Copy IO Benchmark: Stack vs Box Allocation ===");
	println!("Buffer size: {} KB", BUFFER_SIZE / 1024);
	println!("Test data size: {} MB", TEST_DATA_SIZE / (1024 * 1024));
	println!("Iterations: {NUM_ITERATIONS}\n");

	// Run concurrent benchmarks
	println!("Running concurrent benchmarks...\n");

	let stack_concurrent = benchmark_concurrent("Stack", spawn_copy_stack).await;
	let box_concurrent = benchmark_concurrent("Box", spawn_copy_box).await;

	println!("\n=== Concurrent Results ===");
	println!("Stack:          {stack_concurrent:?}");
	println!("Box: {box_concurrent:?}");
}

result:

=== Copy IO Benchmark: Stack vs Box Allocation ===
Buffer size: 16 KB
Test data size: 10240 MB
Iterations: 10000

Running concurrent benchmarks...

  [Stack] 1000000 concurrent connections completed in 1.457541375s
  [Box] 1000000 concurrent connections completed in 818.383292ms

=== Concurrent Results ===
Stack:          1.457541375s
Box: 818.383292ms

@zijiren233
Copy link
Author

Actually, tokio::io::copy_bidirectional does something similar.

@Itsusinn Itsusinn requested a review from Copilot November 3, 2025 15:44
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR optimizes the copy_io function by moving buffer allocations from the stack to the heap using boxed slices. The change addresses memory efficiency concerns when handling a large number of concurrent connections, reducing the future size and associated memory copying overhead.

Key Changes:

  • Modified buffer allocation strategy from stack arrays to heap-allocated boxed slices

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 10 to 11
let mut a2b = vec![0u8; BUFFER_SIZE].into_boxed_slice();
let mut b2a = vec![0u8; BUFFER_SIZE].into_boxed_slice();
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation pattern vec![0u8; BUFFER_SIZE].into_boxed_slice() performs unnecessary zero-initialization. Consider using Box::new_uninit_slice(BUFFER_SIZE) followed by assume_init() after the first read, or allocate with Vec::with_capacity(BUFFER_SIZE) and manually set the length, to avoid the overhead of zeroing memory that will be immediately overwritten by the read operations.

Copilot uses AI. Check for mistakes.
Copy link
Owner

@Itsusinn Itsusinn Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the bytes crate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to use the bytes crate here, since the buffer is used in a very simple way.
What's more worth optimizing is what copilot mentioned there's no need to initialize the slice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result after using uninit is as follows:

  [Stack] 1000000 concurrent connections completed in 1.846773625s
  [Box] 1000000 concurrent connections completed in 603.313875ms
  [Box uninit] 1000000 concurrent connections completed in 566.827834ms

@Itsusinn
Copy link
Owner

Itsusinn commented Nov 4, 2025

Could you add several simple unit tests to copy_io ?

@Itsusinn Itsusinn merged commit fc2abaa into Itsusinn:main Nov 4, 2025
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants