Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions protocol/examples/bufreader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// SPDX-License-Identifier: CC0-1.0

//! Benchmark exploring the performance impact of using a `BufReader` with BIP-324 protocol streams.
//!
//! The BIP-324 protocol requires many relatively small read operations. For every packet, first
//! the 3-byte length is read and then the rest of the packet. Bitcoin p2p messages are also
//! relatively small in size. These characteristics can lead to inefficient system calls.
//!
//! This example does not model real life very well, because the write half just dumps
//! all the messages at once. In reality, a bitcoin p2p connection is bursty or even
//! quite. And this doesn't model any sort of network latency or partially written
//! packets. But this example *does* highlight how during heavy write periods, a bufreader
//! improves performance by ironing out some of BIP-324 characteristics.
//!
//! # Usage
//!
//! ```bash
//! cargo run --release --example bufreader --features tokio
//! ```

use bip324::futures::Protocol;
use bip324::{Network, Role};
use std::fmt;
use std::time::{Duration, Instant};
use tokio::io::BufReader;
use tokio::net::{TcpListener, TcpStream};

/// Test scenario configuration.
#[derive(Clone)]
struct Scenario {
name: &'static str,
/// Message traffic pattern set by the sizes of messages to send.
message_sizes: Vec<usize>,
/// Number of times to repeat the message traffic pattern.
iterations: usize,
}

impl Scenario {
fn bitcoin_typical() -> Self {
Self {
name: "Bitcoin Traffic",
// Some common bitcoin message sizes.
//
// * ping/pong: ~10 bytes
// * inv: ~37 bytes per item
// * addr: ~30 bytes per address
// * tx: 200-500 bytes
// * block header: ~80 bytes
message_sizes: vec![10, 37, 30, 250, 80, 500, 37, 30, 10, 10],
iterations: 10000,
}
}

fn large_messages() -> Self {
Self {
name: "Large Messages",
message_sizes: vec![8192, 16384, 65536],
iterations: 1000,
}
}

fn small_messages() -> Self {
Self {
name: "Small Messages",
message_sizes: vec![1, 2, 3, 4, 5],
iterations: 20000,
}
}

fn total_messages(&self) -> usize {
self.message_sizes.len() * self.iterations
}

fn total_bytes(&self) -> usize {
self.message_sizes.iter().sum::<usize>() * self.iterations
}

/// Display benchmark results for this scenario.
fn display_results(&self, without_buf: Duration, with_buf: Duration) {
let improvement = ((without_buf.as_secs_f64() - with_buf.as_secs_f64())
/ without_buf.as_secs_f64())
* 100.0;

println!("{self}");
println!(" Without BufReader: {} ms", without_buf.as_millis());
println!(" With BufReader: {} ms", with_buf.as_millis());
println!(" Improvement: {improvement:.1}%");
}
}

impl fmt::Display for Scenario {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}: {} total messages, {} bytes",
self.name,
self.total_messages(),
self.total_bytes()
)
}
}

/// Run benchmark for a specific scenario.
async fn benchmark_scenario(scenario: &Scenario) -> Result<(), Box<dyn std::error::Error>> {
let (server_addr, _server_handle) = start_server(scenario.clone()).await?;

let without_buf = Client::NonBuffered.run(&server_addr, scenario).await?;
let with_buf = Client::Buffered.run(&server_addr, scenario).await?;

scenario.display_results(without_buf, with_buf);

Ok(())
}

/// Start the server which write out all the messages of a scenario.
async fn start_server(
scenario: Scenario,
) -> Result<(String, tokio::task::JoinHandle<()>), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?.to_string();

let handle = tokio::spawn(async move {
// Handle two connections per scenario, one with buffer and one without buffer.
for _ in 0..2 {
let (stream, _) = listener.accept().await.unwrap();
let (reader, writer) = stream.into_split();

let garbage = vec![0x88u8; 512];
let mut protocol = Protocol::new(
Network::Bitcoin,
Role::Responder,
Some(&garbage),
None,
reader,
writer,
)
.await
.unwrap();

// Pre-allocate messages to send.
let messages: Vec<Vec<u8>> = scenario
.message_sizes
.iter()
.map(|&size| vec![0x42u8; size])
.collect();

// Dump them all at once. This is not very realistic,
// but the test is trying trying to measure the read
// syscalls. Don't want to introduce write performance.
for _ in 0..scenario.iterations {
for message in &messages {
protocol.write(message).await.unwrap();
}
}
}
});

Ok((addr, handle))
}

/// Client reads all the messages.
enum Client {
Buffered,
NonBuffered,
}

impl Client {
/// Run the client for a scenario and return the duration to read all the messages.
async fn run(
&self,
server_addr: &str,
scenario: &Scenario,
) -> Result<Duration, Box<dyn std::error::Error>> {
let start = Instant::now();

let stream = TcpStream::connect(server_addr).await?;
let (reader, writer) = stream.into_split();

match self {
Client::Buffered => {
let buffered_reader = BufReader::new(reader);
let mut protocol = Protocol::new(
Network::Bitcoin,
Role::Initiator,
None,
None,
buffered_reader,
writer,
)
.await?;

// Read all messages
for _ in 0..scenario.total_messages() {
let _payload = protocol.read().await?;
}
}
Client::NonBuffered => {
let mut protocol = Protocol::new(
Network::Bitcoin,
Role::Initiator,
None,
None,
reader,
writer,
)
.await?;

// Read all messages
for _ in 0..scenario.total_messages() {
let _payload = protocol.read().await?;
}
}
};

Ok(start.elapsed())
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let scenarios = vec![
Scenario::bitcoin_typical(),
Scenario::large_messages(),
Scenario::small_messages(),
];

for scenario in scenarios {
benchmark_scenario(&scenario).await?;
}

Ok(())
}
Loading