From 4b577a873a4e274a364a47787afcc781a49d5570 Mon Sep 17 00:00:00 2001 From: hatoo Date: Thu, 30 Oct 2025 16:42:47 +0900 Subject: [PATCH 01/10] wip --- Cargo.lock | 21 ++++++++++++++ Cargo.toml | 1 + src/client.rs | 69 +++++++++++++++++++++++----------------------- src/db.rs | 8 +++--- src/lib.rs | 9 +++--- src/monitor.rs | 4 +-- src/printer.rs | 10 +++++-- src/result_data.rs | 11 ++++++-- 8 files changed, 83 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6ce941c..c37813a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -986,6 +986,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "darling" version = "0.20.11" @@ -2264,6 +2274,16 @@ dependencies = [ "adler2", ] +[[package]] +name = "minstant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" +dependencies = [ + "ctor", + "web-time", +] + [[package]] name = "mio" version = "1.0.4" @@ -2486,6 +2506,7 @@ dependencies = [ "kanal", "lazy_static", "libc", + "minstant", "native-tls", "num_cpus", "predicates", diff --git a/Cargo.toml b/Cargo.toml index 1493ca74..73050b6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ tokio-vsock = { version = "0.7.2", optional = true } rusqlite = { version = "0.37.0", features = ["bundled"] } num_cpus = "1.16.0" tokio-util = "0.7.13" +minstant = "0.1.7" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/client.rs b/src/client.rs index 6020e80b..e6a130f3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -43,8 +43,8 @@ fn format_host_port(host: &str, port: u16) -> String { #[derive(Debug, Clone, Copy)] pub struct ConnectionTime { - pub dns_lookup: std::time::Instant, - pub dialup: std::time::Instant, + pub dns_lookup: minstant::Instant, + pub dialup: minstant::Instant, } #[derive(Debug, Clone)] @@ -52,16 +52,16 @@ pub struct ConnectionTime { pub struct RequestResult { pub rng: Pcg64Si, // When the query should started - pub start_latency_correction: Option, + pub start_latency_correction: Option, /// When the query started - pub start: std::time::Instant, + pub start: minstant::Instant, /// DNS + dialup /// None when reuse connection pub connection_time: Option, /// First body byte received - pub first_byte: Option, + pub first_byte: Option, /// When the query ends - pub end: std::time::Instant, + pub end: minstant::Instant, /// HTTP status pub status: http::StatusCode, /// Length of body @@ -497,13 +497,13 @@ impl Client { url: &Url, rng: &mut R, http_version: http::Version, - ) -> Result<(Instant, Stream), ClientError> { + ) -> Result<(minstant::Instant, Stream), ClientError> { let timeout_duration = self.connect_timeout; #[cfg(feature = "http3")] if http_version == http::Version::HTTP_3 { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = minstant::Instant::now(); let stream = tokio::time::timeout(timeout_duration, self.quic_client(addr, url)).await; return match stream { Ok(Ok(stream)) => Ok((dns_lookup, stream)), @@ -513,7 +513,7 @@ impl Client { } if url.scheme() == "https" { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = minstant::Instant::now(); // If we do not put a timeout here then the connections attempts will // linger long past the configured timeout let stream = @@ -527,7 +527,7 @@ impl Client { } #[cfg(unix)] if let Some(socket_path) = &self.unix_socket { - let dns_lookup = Instant::now(); + let dns_lookup = minstant::Instant::now(); let stream = tokio::time::timeout( timeout_duration, tokio::net::UnixStream::connect(socket_path), @@ -541,7 +541,7 @@ impl Client { } #[cfg(feature = "vsock")] if let Some(addr) = self.vsock_addr { - let dns_lookup = Instant::now(); + let dns_lookup = minstant::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio_vsock::VsockStream::connect(addr)) .await; @@ -553,7 +553,7 @@ impl Client { } // HTTP let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = Instant::now(); + let dns_lookup = minstant::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; match stream { @@ -624,7 +624,7 @@ impl Client { &self, url: &Url, rng: &mut R, - ) -> Result<(Instant, SendRequestHttp1), ClientError> { + ) -> Result<(minstant::Instant, SendRequestHttp1), ClientError> { if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -687,8 +687,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let mut start = std::time::Instant::now(); - let mut first_byte: Option = None; + let mut start = minstant::Instant::now(); + let mut first_byte: Option = None; let mut connection_time: Option = None; let mut send_request = if let Some(send_request) = client_state.send_request.take() { @@ -696,7 +696,7 @@ impl Client { } else { let (dns_lookup, send_request) = self.client_http1(&self.url, &mut client_state.rng).await?; - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup, dialup }); send_request @@ -704,11 +704,11 @@ impl Client { while send_request.ready().await.is_err() { // This gets hit when the connection for HTTP/1.1 faults // This re-connects - start = std::time::Instant::now(); + start = minstant::Instant::now(); let (dns_lookup, send_request_) = self.client_http1(&self.url, &mut client_state.rng).await?; send_request = send_request_; - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup, dialup }); } match send_request.send_request(request).await { @@ -719,7 +719,7 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(minstant::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } @@ -742,7 +742,7 @@ impl Client { } } - let end = std::time::Instant::now(); + let end = minstant::Instant::now(); let result = RequestResult { rng, @@ -836,18 +836,18 @@ impl Client { .handshake(TokioIo::new(stream)) .await?; tokio::spawn(conn); - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } else { let send_request = stream.handshake_http2().await?; - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } } else { let (dns_lookup, stream) = self.client(url, rng, self.http_version).await?; let send_request = stream.handshake_http2().await?; - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } } @@ -858,8 +858,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = std::time::Instant::now(); - let mut first_byte: Option = None; + let start = minstant::Instant::now(); + let mut first_byte: Option = None; let connection_time: Option = None; match client_state.send_request.send_request(request).await { @@ -870,12 +870,12 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(minstant::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } - let end = std::time::Instant::now(); + let end = minstant::Instant::now(); let result = RequestResult { rng, @@ -1041,7 +1041,7 @@ async fn work_http2_once( client_state: &mut ClientStateHttp2, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http2(client_state).await; let is_cancel = is_cancel_error(&res); @@ -1065,7 +1065,7 @@ pub(crate) fn set_connection_time( pub(crate) fn set_start_latency_correction( res: &mut Result, - start_latency_correction: std::time::Instant, + start_latency_correction: minstant::Instant, ) { if let Ok(res) = res { res.start_latency_correction = Some(start_latency_correction); @@ -1441,7 +1441,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); tx.send(now)?; } } @@ -1450,7 +1450,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); for _ in 0..rate { tx.send(now)?; } @@ -1459,7 +1459,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); for _ in 0..n_tasks - n { tx.send(now)?; } @@ -1993,11 +1993,12 @@ pub async fn work_until_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; + // TODO: use minstant only let now = std::time::Instant::now(); if now > dead_line { break; } - let _ = tx.send(now); + let _ = tx.send(minstant::Instant::now()); } // tx gone }); @@ -2013,7 +2014,7 @@ pub async fn work_until_with_qps_latency_correction( } for _ in 0..rate { - let _ = tx.send(now); + let _ = tx.send(minstant::Instant::now()); } } // tx gone diff --git a/src/db.rs b/src/db.rs index a021721c..c44363d7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ fn create_db(conn: &Connection) -> Result { pub fn store( client: &Client, db_url: &str, - start: std::time::Instant, + start: minstant::Instant, request_records: &[RequestResult], run: u64, ) -> Result { @@ -66,16 +66,16 @@ mod test_db { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - let start = std::time::Instant::now(); + let start = minstant::Instant::now(); let test_val = RequestResult { rng: SeedableRng::seed_from_u64(0), status: hyper::StatusCode::OK, len_bytes: 100, start_latency_correction: None, - start: std::time::Instant::now(), + start: minstant::Instant::now(), connection_time: None, first_byte: None, - end: std::time::Instant::now(), + end: minstant::Instant::now(), }; let test_vec = vec![test_val.clone(), test_val.clone()]; let client = Client::default(); diff --git a/src/lib.rs b/src/lib.rs index d27a3392..d0c69ee3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -646,6 +646,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { .duration_since(std::time::UNIX_EPOCH)? .as_secs(); let start = std::time::Instant::now(); + let start_minstant = minstant::Instant::now(); let data_collect_future: Pin>> = match work_mode { @@ -727,7 +728,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { for res in buf { all.push(res); } - let _ = printer::print_result(print_config, start, &all, start.elapsed()); + let _ = printer::print_result(print_config, start_minstant, &all, start_minstant.elapsed()); std::process::exit(libc::EXIT_SUCCESS); } _ = token_ctrl_c.cancelled() => { @@ -757,7 +758,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { .map(|d| monitor::EndLine::Duration(d.into())) .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), report_receiver: result_rx, - start, + start: start_minstant, fps: opts.fps, disable_color: opts.disable_color, time_unit: opts.time_unit, @@ -866,11 +867,11 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { let duration = start.elapsed(); let (res, print_config) = data_collect_future.await; - printer::print_result(print_config, start, &res, duration)?; + printer::print_result(print_config, start_minstant, &res, duration)?; if let Some(db_url) = opts.db_url { eprintln!("Storing results to {db_url}"); - db::store(&client, &db_url, start, res.success(), run)?; + db::store(&client, &db_url, start_minstant, res.success(), run)?; } Ok(()) diff --git a/src/monitor.rs b/src/monitor.rs index eb6fd3f1..545228b6 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -53,7 +53,7 @@ pub struct Monitor { /// All workers sends each result to this channel pub report_receiver: kanal::Receiver>, // When started - pub start: std::time::Instant, + pub start: minstant::Instant, // Frame per second of TUI pub fps: usize, pub disable_color: bool, @@ -114,7 +114,7 @@ impl Monitor { break; } - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); let progress = match &self.end_line { EndLine::Duration(d) => { ((now - self.start).as_secs_f64() / d.as_secs_f64()).clamp(0.0, 1.0) diff --git a/src/printer.rs b/src/printer.rs index 5e707384..2469c2f5 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -113,7 +113,7 @@ pub struct PrintConfig { pub fn print_result( mut config: PrintConfig, - start: Instant, + start: minstant::Instant, res: &ResultData, total_duration: Duration, ) -> anyhow::Result<()> { @@ -141,7 +141,7 @@ pub fn print_result( /// Print all summary as JSON fn print_json( w: &mut W, - start: Instant, + start: minstant::Instant, res: &ResultData, total_duration: Duration, stats_success_breakdown: bool, @@ -378,7 +378,11 @@ fn print_json( ) } -fn print_csv(w: &mut W, start: Instant, res: &ResultData) -> std::io::Result<()> { +fn print_csv( + w: &mut W, + start: minstant::Instant, + res: &ResultData, +) -> std::io::Result<()> { // csv header writeln!( w, diff --git a/src/result_data.rs b/src/result_data.rs index a2906855..664a1ad9 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -107,7 +107,10 @@ impl ResultData { &self.error_distribution } - pub fn end_times_from_start(&self, start: Instant) -> impl Iterator + '_ { + pub fn end_times_from_start( + &self, + start: minstant::Instant, + ) -> impl Iterator + '_ { self.success.iter().map(move |result| result.end - start) } @@ -184,6 +187,7 @@ impl ResultData { } } +/* #[cfg(test)] mod tests { use float_cmp::assert_approx_eq; @@ -191,7 +195,7 @@ mod tests { use super::*; use crate::client::{ClientError, ConnectionTime, RequestResult}; - use std::time::{Duration, Instant}; + use std::time::Duration; fn build_mock_request_result( status: StatusCode, @@ -201,7 +205,7 @@ mod tests { first_byte: u64, size: usize, ) -> Result { - let now = Instant::now(); + let now = minstant::Instant::now(); Ok(RequestResult { rng: SeedableRng::seed_from_u64(0), start_latency_correction: None, @@ -319,3 +323,4 @@ mod tests { assert_approx_eq!(f64, res.dns_lookup_stat().max(), 0.3); } } +*/ From 072e3a3a08e854db11df93437b6e0508a3a1d334 Mon Sep 17 00:00:00 2001 From: hatoo Date: Thu, 30 Oct 2025 16:43:10 +0900 Subject: [PATCH 02/10] fix --- src/client.rs | 1 - src/printer.rs | 6 +----- src/result_data.rs | 5 +---- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index e6a130f3..f7bb5feb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use std::{ Arc, atomic::{AtomicBool, Ordering::Relaxed}, }, - time::Instant, }; use thiserror::Error; use tokio::{ diff --git a/src/printer.rs b/src/printer.rs index 2469c2f5..00dd64b6 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -4,11 +4,7 @@ use byte_unit::Byte; use crossterm::style::{StyledContent, Stylize}; use hyper::http::{self, StatusCode}; use ratatui::crossterm; -use std::{ - collections::BTreeMap, - io::Write, - time::{Duration, Instant}, -}; +use std::{collections::BTreeMap, io::Write, time::Duration}; #[derive(Clone, Copy)] struct StyleScheme { diff --git a/src/result_data.rs b/src/result_data.rs index 664a1ad9..9af40ed1 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -1,7 +1,4 @@ -use std::{ - collections::BTreeMap, - time::{Duration, Instant}, -}; +use std::{collections::BTreeMap, time::Duration}; use average::{Estimate, Max, Mean, Min, concatenate}; use hyper::StatusCode; From 6393ecbe63d8c7018cd6018b4ea05a574fb5c156 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 1 Nov 2025 16:27:48 +0900 Subject: [PATCH 03/10] h3 --- src/client_h3.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/client_h3.rs b/src/client_h3.rs index 2f8cf70c..d5982893 100644 --- a/src/client_h3.rs +++ b/src/client_h3.rs @@ -11,7 +11,6 @@ use std::net::UdpSocket; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicIsize; -use std::time::Instant; use tokio::sync::Semaphore; use url::Url; @@ -70,7 +69,7 @@ impl Client { ) -> Result<(ConnectionTime, SendRequestHttp3), ClientError> { let (dns_lookup, stream) = self.client(url, rng, http::Version::HTTP_3).await?; let send_request = stream.handshake_http3().await?; - let dialup = std::time::Instant::now(); + let dialup = minstant::Instant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } @@ -113,9 +112,9 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = std::time::Instant::now(); + let start = minstant::Instant::now(); let connection_time: Option = None; - let mut first_byte: Option = None; + let mut first_byte: Option = None; // if we implement http_body::Body on our H3 SendRequest, we can do some nice streaming stuff // with the response here. However as we don't really use the response we can get away @@ -143,11 +142,11 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.recv_data().await.map_err(Http3Error::from)? { if first_byte.is_none() { - first_byte = Some(std::time::Instant::now()) + first_byte = Some(minstant::Instant::now()) } len_bytes += chunk.remaining(); } - let end = std::time::Instant::now(); + let end = minstant::Instant::now(); let result = RequestResult { rng, @@ -235,7 +234,7 @@ pub(crate) async fn send_debug_request_http3( pub(crate) async fn parallel_work_http3( n_connections: usize, n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, deadline: Option, @@ -274,7 +273,7 @@ pub(crate) async fn parallel_work_http3( */ async fn create_and_load_up_single_connection_http3( n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, s: Arc, @@ -408,7 +407,7 @@ pub(crate) async fn work_http3_once( client_state: &mut ClientStateHttp3, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http3(client_state).await; let is_cancel = is_cancel_error(&res); @@ -565,7 +564,7 @@ pub async fn work( n_connections: usize, n_http2_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let rx = rx.to_async(); let n_tasks_emitter = async move { @@ -592,7 +591,7 @@ pub async fn work_with_qps( n_connections: usize, n_http_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let work_queue = async move { match query_limit { @@ -659,7 +658,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); tx.send(Some(now))?; } } @@ -668,7 +667,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); for _ in 0..rate { tx.send(Some(now))?; } @@ -677,7 +676,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = minstant::Instant::now(); for _ in 0..n_tasks - n { tx.send(Some(now))?; } @@ -707,7 +706,7 @@ pub async fn work_until( n_http_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { - let (tx, rx) = kanal::bounded_async::>(5000); + let (tx, rx) = kanal::bounded_async::>(5000); // This emitter is used for H3 to give it unlimited tokens to emit work. let cancel_token = tokio_util::sync::CancellationToken::new(); let emitter_handle = endless_emitter(cancel_token.clone(), tx).await; @@ -743,7 +742,7 @@ pub async fn work_until_with_qps( ) { let rx = match query_limit { QueryLimit::Qps(qps) => { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); tokio::spawn(async move { for i in 0.. { if std::time::Instant::now() > dead_line { @@ -818,7 +817,7 @@ pub async fn work_until_with_qps_latency_correction( if now > dead_line { break; } - let _ = tx.send(Some(now)); + let _ = tx.send(Some(minstant::Instant::now())); } // tx gone }); @@ -833,6 +832,7 @@ pub async fn work_until_with_qps_latency_correction( break; } + let now = minstant::Instant::now(); for _ in 0..rate { let _ = tx.send(Some(now)); } @@ -860,7 +860,7 @@ pub async fn work_until_with_qps_latency_correction( #[cfg(feature = "http3")] async fn endless_emitter( cancellation_token: tokio_util::sync::CancellationToken, - tx: kanal::AsyncSender>, + tx: kanal::AsyncSender>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { From d106d26febd4a41faa32c61388a566bd20d29544 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 1 Nov 2025 17:45:16 +0900 Subject: [PATCH 04/10] save --- Cargo.lock | 77 +++++++++++++++++++++++++++++++++----------- Cargo.toml | 1 + src/client.rs | 68 +++++++++++++++++++------------------- src/client_h3.rs | 36 ++++++++++----------- src/db.rs | 8 ++--- src/lib.rs | 3 +- src/monitor.rs | 4 +-- src/printer.rs | 6 ++-- src/result_data.rs | 4 +-- src/small_instant.rs | 43 +++++++++++++++++++++++++ tests/tests.rs | 19 ++++++++++- 11 files changed, 186 insertions(+), 83 deletions(-) create mode 100644 src/small_instant.rs diff --git a/Cargo.lock b/Cargo.lock index 6fdd5b6e..68eb9964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-core", "futures-sink", @@ -30,7 +30,7 @@ dependencies = [ "actix-service", "actix-utils", "base64", - "bitflags", + "bitflags 2.10.0", "brotli", "bytes", "bytestring", @@ -495,7 +495,7 @@ version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cexpr", "clang-sys", "itertools", @@ -524,6 +524,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -564,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" dependencies = [ "borsh-derive", - "cfg_aliases", + "cfg_aliases 0.2.1", ] [[package]] @@ -720,6 +726,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -920,7 +932,7 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "crossterm_winapi", "mio", "parking_lot", @@ -2221,9 +2233,9 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "memoffset", ] @@ -2393,6 +2405,7 @@ dependencies = [ "rustls-pki-types", "serde", "serde_json", + "static_init", "tempfile", "thiserror 2.0.17", "tikv-jemallocator", @@ -2426,7 +2439,7 @@ version = "0.10.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24ad14dd45412269e1a30f52ad8f0664f0f4f4a89ee8fe28c3b3527021ebb654" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -2648,7 +2661,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", - "cfg_aliases", + "cfg_aliases 0.2.1", "futures-io", "pin-project-lite", "quinn-proto", @@ -2692,7 +2705,7 @@ version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" dependencies = [ - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2 0.6.1", @@ -2796,7 +2809,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cassowary", "compact_str", "crossterm", @@ -2831,7 +2844,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -3056,7 +3069,7 @@ version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" dependencies = [ - "bitflags", + "bitflags 2.10.0", "fallible-iterator", "fallible-streaming-iterator", "hashlink", @@ -3101,7 +3114,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -3114,7 +3127,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.11.0", @@ -3255,7 +3268,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -3268,7 +3281,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -3490,6 +3503,34 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "static_init" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bae1df58c5fea7502e8e352ec26b5579f6178e1fdb311e088580c980dee25ed" +dependencies = [ + "bitflags 1.3.2", + "cfg_aliases 0.2.1", + "libc", + "parking_lot", + "parking_lot_core", + "static_init_macro", + "winapi", +] + +[[package]] +name = "static_init_macro" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1389c88ddd739ec6d3f8f83343764a0e944cd23cfbf126a9796a714b0b6edd6f" +dependencies = [ + "cfg_aliases 0.1.1", + "memchr", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "strsim" version = "0.11.1" @@ -3839,7 +3880,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-util", "http 1.3.1", diff --git a/Cargo.toml b/Cargo.toml index 6b9d0c97..7362f70c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ rusqlite = { version = "0.37.0", features = ["bundled"] } num_cpus = "1.16.0" tokio-util = "0.7.13" minstant = "0.1.7" +static_init = "1.0.4" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/client.rs b/src/client.rs index f7bb5feb..51bcc4b0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -42,8 +42,8 @@ fn format_host_port(host: &str, port: u16) -> String { #[derive(Debug, Clone, Copy)] pub struct ConnectionTime { - pub dns_lookup: minstant::Instant, - pub dialup: minstant::Instant, + pub dns_lookup: crate::small_instant::SmallInstant, + pub dialup: crate::small_instant::SmallInstant, } #[derive(Debug, Clone)] @@ -51,16 +51,16 @@ pub struct ConnectionTime { pub struct RequestResult { pub rng: Pcg64Si, // When the query should started - pub start_latency_correction: Option, + pub start_latency_correction: Option, /// When the query started - pub start: minstant::Instant, + pub start: crate::small_instant::SmallInstant, /// DNS + dialup /// None when reuse connection pub connection_time: Option, /// First body byte received - pub first_byte: Option, + pub first_byte: Option, /// When the query ends - pub end: minstant::Instant, + pub end: crate::small_instant::SmallInstant, /// HTTP status pub status: http::StatusCode, /// Length of body @@ -496,13 +496,13 @@ impl Client { url: &Url, rng: &mut R, http_version: http::Version, - ) -> Result<(minstant::Instant, Stream), ClientError> { + ) -> Result<(crate::small_instant::SmallInstant, Stream), ClientError> { let timeout_duration = self.connect_timeout; #[cfg(feature = "http3")] if http_version == http::Version::HTTP_3 { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = minstant::Instant::now(); + let dns_lookup = crate::small_instant::SmallInstant::now(); let stream = tokio::time::timeout(timeout_duration, self.quic_client(addr, url)).await; return match stream { Ok(Ok(stream)) => Ok((dns_lookup, stream)), @@ -512,7 +512,7 @@ impl Client { } if url.scheme() == "https" { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = minstant::Instant::now(); + let dns_lookup = crate::small_instant::SmallInstant::now(); // If we do not put a timeout here then the connections attempts will // linger long past the configured timeout let stream = @@ -526,7 +526,7 @@ impl Client { } #[cfg(unix)] if let Some(socket_path) = &self.unix_socket { - let dns_lookup = minstant::Instant::now(); + let dns_lookup = crate::small_instant::SmallInstant::now(); let stream = tokio::time::timeout( timeout_duration, tokio::net::UnixStream::connect(socket_path), @@ -540,7 +540,7 @@ impl Client { } #[cfg(feature = "vsock")] if let Some(addr) = self.vsock_addr { - let dns_lookup = minstant::Instant::now(); + let dns_lookup = crate::small_instant::SmallInstant::now(); let stream = tokio::time::timeout(timeout_duration, tokio_vsock::VsockStream::connect(addr)) .await; @@ -552,7 +552,7 @@ impl Client { } // HTTP let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = minstant::Instant::now(); + let dns_lookup = crate::small_instant::SmallInstant::now(); let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; match stream { @@ -623,7 +623,7 @@ impl Client { &self, url: &Url, rng: &mut R, - ) -> Result<(minstant::Instant, SendRequestHttp1), ClientError> { + ) -> Result<(crate::small_instant::SmallInstant, SendRequestHttp1), ClientError> { if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -686,8 +686,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let mut start = minstant::Instant::now(); - let mut first_byte: Option = None; + let mut start = crate::small_instant::SmallInstant::now(); + let mut first_byte: Option = None; let mut connection_time: Option = None; let mut send_request = if let Some(send_request) = client_state.send_request.take() { @@ -695,7 +695,7 @@ impl Client { } else { let (dns_lookup, send_request) = self.client_http1(&self.url, &mut client_state.rng).await?; - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); connection_time = Some(ConnectionTime { dns_lookup, dialup }); send_request @@ -703,11 +703,11 @@ impl Client { while send_request.ready().await.is_err() { // This gets hit when the connection for HTTP/1.1 faults // This re-connects - start = minstant::Instant::now(); + start = crate::small_instant::SmallInstant::now(); let (dns_lookup, send_request_) = self.client_http1(&self.url, &mut client_state.rng).await?; send_request = send_request_; - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); connection_time = Some(ConnectionTime { dns_lookup, dialup }); } match send_request.send_request(request).await { @@ -718,7 +718,7 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(minstant::Instant::now()) + first_byte = Some(crate::small_instant::SmallInstant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } @@ -741,7 +741,7 @@ impl Client { } } - let end = minstant::Instant::now(); + let end = crate::small_instant::SmallInstant::now(); let result = RequestResult { rng, @@ -835,18 +835,18 @@ impl Client { .handshake(TokioIo::new(stream)) .await?; tokio::spawn(conn); - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } else { let send_request = stream.handshake_http2().await?; - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } } else { let (dns_lookup, stream) = self.client(url, rng, self.http_version).await?; let send_request = stream.handshake_http2().await?; - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } } @@ -857,8 +857,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = minstant::Instant::now(); - let mut first_byte: Option = None; + let start = crate::small_instant::SmallInstant::now(); + let mut first_byte: Option = None; let connection_time: Option = None; match client_state.send_request.send_request(request).await { @@ -869,12 +869,12 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(minstant::Instant::now()) + first_byte = Some(crate::small_instant::SmallInstant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } - let end = minstant::Instant::now(); + let end = crate::small_instant::SmallInstant::now(); let result = RequestResult { rng, @@ -1040,7 +1040,7 @@ async fn work_http2_once( client_state: &mut ClientStateHttp2, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http2(client_state).await; let is_cancel = is_cancel_error(&res); @@ -1064,7 +1064,7 @@ pub(crate) fn set_connection_time( pub(crate) fn set_start_latency_correction( res: &mut Result, - start_latency_correction: minstant::Instant, + start_latency_correction: crate::small_instant::SmallInstant, ) { if let Ok(res) = res { res.start_latency_correction = Some(start_latency_correction); @@ -1440,7 +1440,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); tx.send(now)?; } } @@ -1449,7 +1449,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); for _ in 0..rate { tx.send(now)?; } @@ -1458,7 +1458,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); for _ in 0..n_tasks - n { tx.send(now)?; } @@ -1997,7 +1997,7 @@ pub async fn work_until_with_qps_latency_correction( if now > dead_line { break; } - let _ = tx.send(minstant::Instant::now()); + let _ = tx.send(crate::small_instant::SmallInstant::now()); } // tx gone }); @@ -2013,7 +2013,7 @@ pub async fn work_until_with_qps_latency_correction( } for _ in 0..rate { - let _ = tx.send(minstant::Instant::now()); + let _ = tx.send(crate::small_instant::SmallInstant::now()); } } // tx gone diff --git a/src/client_h3.rs b/src/client_h3.rs index d5982893..213b7bc8 100644 --- a/src/client_h3.rs +++ b/src/client_h3.rs @@ -69,7 +69,7 @@ impl Client { ) -> Result<(ConnectionTime, SendRequestHttp3), ClientError> { let (dns_lookup, stream) = self.client(url, rng, http::Version::HTTP_3).await?; let send_request = stream.handshake_http3().await?; - let dialup = minstant::Instant::now(); + let dialup = crate::small_instant::SmallInstant::now(); Ok((ConnectionTime { dns_lookup, dialup }, send_request)) } @@ -112,9 +112,9 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = minstant::Instant::now(); + let start = crate::small_instant::SmallInstant::now(); let connection_time: Option = None; - let mut first_byte: Option = None; + let mut first_byte: Option = None; // if we implement http_body::Body on our H3 SendRequest, we can do some nice streaming stuff // with the response here. However as we don't really use the response we can get away @@ -142,11 +142,11 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.recv_data().await.map_err(Http3Error::from)? { if first_byte.is_none() { - first_byte = Some(minstant::Instant::now()) + first_byte = Some(crate::small_instant::SmallInstant::now()) } len_bytes += chunk.remaining(); } - let end = minstant::Instant::now(); + let end = crate::small_instant::SmallInstant::now(); let result = RequestResult { rng, @@ -234,7 +234,7 @@ pub(crate) async fn send_debug_request_http3( pub(crate) async fn parallel_work_http3( n_connections: usize, n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, deadline: Option, @@ -273,7 +273,7 @@ pub(crate) async fn parallel_work_http3( */ async fn create_and_load_up_single_connection_http3( n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, s: Arc, @@ -407,7 +407,7 @@ pub(crate) async fn work_http3_once( client_state: &mut ClientStateHttp3, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http3(client_state).await; let is_cancel = is_cancel_error(&res); @@ -564,7 +564,7 @@ pub async fn work( n_connections: usize, n_http2_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let rx = rx.to_async(); let n_tasks_emitter = async move { @@ -591,7 +591,7 @@ pub async fn work_with_qps( n_connections: usize, n_http_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let work_queue = async move { match query_limit { @@ -658,7 +658,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); tx.send(Some(now))?; } } @@ -667,7 +667,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); for _ in 0..rate { tx.send(Some(now))?; } @@ -676,7 +676,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); for _ in 0..n_tasks - n { tx.send(Some(now))?; } @@ -706,7 +706,7 @@ pub async fn work_until( n_http_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { - let (tx, rx) = kanal::bounded_async::>(5000); + let (tx, rx) = kanal::bounded_async::>(5000); // This emitter is used for H3 to give it unlimited tokens to emit work. let cancel_token = tokio_util::sync::CancellationToken::new(); let emitter_handle = endless_emitter(cancel_token.clone(), tx).await; @@ -742,7 +742,7 @@ pub async fn work_until_with_qps( ) { let rx = match query_limit { QueryLimit::Qps(qps) => { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); tokio::spawn(async move { for i in 0.. { if std::time::Instant::now() > dead_line { @@ -817,7 +817,7 @@ pub async fn work_until_with_qps_latency_correction( if now > dead_line { break; } - let _ = tx.send(Some(minstant::Instant::now())); + let _ = tx.send(Some(crate::small_instant::SmallInstant::now())); } // tx gone }); @@ -832,7 +832,7 @@ pub async fn work_until_with_qps_latency_correction( break; } - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); for _ in 0..rate { let _ = tx.send(Some(now)); } @@ -860,7 +860,7 @@ pub async fn work_until_with_qps_latency_correction( #[cfg(feature = "http3")] async fn endless_emitter( cancellation_token: tokio_util::sync::CancellationToken, - tx: kanal::AsyncSender>, + tx: kanal::AsyncSender>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { diff --git a/src/db.rs b/src/db.rs index c44363d7..7967511f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ fn create_db(conn: &Connection) -> Result { pub fn store( client: &Client, db_url: &str, - start: minstant::Instant, + start: crate::small_instant::SmallInstant, request_records: &[RequestResult], run: u64, ) -> Result { @@ -66,16 +66,16 @@ mod test_db { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - let start = minstant::Instant::now(); + let start = crate::small_instant::SmallInstant::now(); let test_val = RequestResult { rng: SeedableRng::seed_from_u64(0), status: hyper::StatusCode::OK, len_bytes: 100, start_latency_correction: None, - start: minstant::Instant::now(), + start: crate::small_instant::SmallInstant::now(), connection_time: None, first_byte: None, - end: minstant::Instant::now(), + end: crate::small_instant::SmallInstant::now(), }; let test_vec = vec![test_val.clone(), test_val.clone()]; let client = Client::default(); diff --git a/src/lib.rs b/src/lib.rs index d0c69ee3..cb1d4695 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ mod pcg64si; mod printer; mod request_generator; mod result_data; +mod small_instant; mod timescale; mod tls_config; mod url_generator; @@ -646,7 +647,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { .duration_since(std::time::UNIX_EPOCH)? .as_secs(); let start = std::time::Instant::now(); - let start_minstant = minstant::Instant::now(); + let start_minstant = crate::small_instant::SmallInstant::now(); let data_collect_future: Pin>> = match work_mode { diff --git a/src/monitor.rs b/src/monitor.rs index 545228b6..30df1475 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -53,7 +53,7 @@ pub struct Monitor { /// All workers sends each result to this channel pub report_receiver: kanal::Receiver>, // When started - pub start: minstant::Instant, + pub start: crate::small_instant::SmallInstant, // Frame per second of TUI pub fps: usize, pub disable_color: bool, @@ -114,7 +114,7 @@ impl Monitor { break; } - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); let progress = match &self.end_line { EndLine::Duration(d) => { ((now - self.start).as_secs_f64() / d.as_secs_f64()).clamp(0.0, 1.0) diff --git a/src/printer.rs b/src/printer.rs index 00dd64b6..cb647f30 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -109,7 +109,7 @@ pub struct PrintConfig { pub fn print_result( mut config: PrintConfig, - start: minstant::Instant, + start: crate::small_instant::SmallInstant, res: &ResultData, total_duration: Duration, ) -> anyhow::Result<()> { @@ -137,7 +137,7 @@ pub fn print_result( /// Print all summary as JSON fn print_json( w: &mut W, - start: minstant::Instant, + start: crate::small_instant::SmallInstant, res: &ResultData, total_duration: Duration, stats_success_breakdown: bool, @@ -376,7 +376,7 @@ fn print_json( fn print_csv( w: &mut W, - start: minstant::Instant, + start: crate::small_instant::SmallInstant, res: &ResultData, ) -> std::io::Result<()> { // csv header diff --git a/src/result_data.rs b/src/result_data.rs index 9af40ed1..a0c2c70d 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -106,7 +106,7 @@ impl ResultData { pub fn end_times_from_start( &self, - start: minstant::Instant, + start: crate::small_instant::SmallInstant, ) -> impl Iterator + '_ { self.success.iter().map(move |result| result.end - start) } @@ -202,7 +202,7 @@ mod tests { first_byte: u64, size: usize, ) -> Result { - let now = minstant::Instant::now(); + let now = crate::small_instant::SmallInstant::now(); Ok(RequestResult { rng: SeedableRng::seed_from_u64(0), start_latency_correction: None, diff --git a/src/small_instant.rs b/src/small_instant.rs new file mode 100644 index 00000000..dd41e3a9 --- /dev/null +++ b/src/small_instant.rs @@ -0,0 +1,43 @@ +use std::{num::NonZeroU64, ops::Sub}; + +#[static_init::dynamic] +static START_INSTANT: std::time::Instant = std::time::Instant::now(); + +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SmallInstant { + pub nanos: NonZeroU64, +} + +impl SmallInstant { + pub fn now() -> Self { + let now = std::time::Instant::now(); + let nanos = now.duration_since(*START_INSTANT).as_nanos() as u64; + + SmallInstant { + nanos: NonZeroU64::new(nanos).unwrap(), + } + } + + pub fn elapsed(&self) -> std::time::Duration { + let now = Self::now(); + + now - *self + } +} + +impl Into for SmallInstant { + fn into(self) -> std::time::Instant { + *START_INSTANT + std::time::Duration::from_nanos(self.nanos.get()) + } +} + +impl Sub for SmallInstant { + type Output = std::time::Duration; + + fn sub(self, rhs: SmallInstant) -> Self::Output { + let duration_nanos = self.nanos.get() - rhs.nanos.get(); + + std::time::Duration::from_nanos(duration_nanos) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 0c7cf1b6..4bfd6394 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -11,6 +11,7 @@ use std::{ use axum::{Router, extract::Path, response::Redirect, routing::get}; use bytes::Bytes; +use clap::Parser; use http::{HeaderMap, Request, Response}; use http_body_util::BodyExt; use http_mitm_proxy::MitmProxy; @@ -25,6 +26,17 @@ use rstest_reuse::{self, *}; #[cfg(feature = "http3")] mod common; +fn run<'a>(args: impl Iterator) { + let opts = oha::Opts::parse_from(["oha", "--no-tui"].into_iter().chain(args)); + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { oha::run(opts).await }) + .unwrap(); +} + // Port 5111- is reserved for testing static PORT: AtomicU16 = AtomicU16::new(5111); @@ -350,7 +362,11 @@ async fn test_request_count(args: &[&str]) -> usize { let (listener, port) = bind_port_and_increment().await; tokio::spawn(async { axum::serve(listener, app).await }); - let args: Vec = args.iter().map(|s| s.to_string()).collect(); + // let args: Vec = args.iter().map(|s| s.to_string()).collect(); + let url = format!("http://127.0.0.1:{port}"); + + run(args.iter().map(|s| *s).chain(std::iter::once(url.as_str()))); + /* tokio::task::spawn_blocking(move || { assert_cmd::cargo::cargo_bin_cmd!() .args(["--no-tui"]) @@ -361,6 +377,7 @@ async fn test_request_count(args: &[&str]) -> usize { }) .await .unwrap(); + */ let mut count = 0; while let Ok(Some(())) = rx.try_recv() { From 959109d7a26e6d409d4df1a2732c8fbb5c9b32f7 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 3 Nov 2025 14:55:07 +0900 Subject: [PATCH 05/10] wip --- Cargo.lock | 21 --------------------- Cargo.toml | 1 - tests/tests.rs | 2 +- 3 files changed, 1 insertion(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da7e818e..2e04af70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -935,16 +935,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctor" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" -dependencies = [ - "quote", - "syn 1.0.109", -] - [[package]] name = "darling" version = "0.20.11" @@ -2144,16 +2134,6 @@ dependencies = [ "simd-adler32", ] -[[package]] -name = "minstant" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" -dependencies = [ - "ctor", - "web-time", -] - [[package]] name = "mio" version = "1.1.0" @@ -2356,7 +2336,6 @@ dependencies = [ "kanal", "lazy_static", "libc", - "minstant", "native-tls", "num_cpus", "predicates", diff --git a/Cargo.toml b/Cargo.toml index 02d8158b..23d3f8da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,6 @@ tokio-vsock = { version = "0.7.2", optional = true } rusqlite = { version = "0.37.0", features = ["bundled"] } num_cpus = "1.16.0" tokio-util = "0.7.13" -minstant = "0.1.7" static_init = "1.0.4" [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/tests/tests.rs b/tests/tests.rs index 9195882d..57b2796f 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1242,7 +1242,7 @@ async fn test_csv_output() { assert!(current_start >= latest_start); latest_start = current_start; assert!(f64::from_str(parts[1]).unwrap() >= 0f64); - assert!(f64::from_str(parts[2]).unwrap() > 0f64); + assert!(f64::from_str(parts[2]).unwrap() >= 0f64); assert!(f64::from_str(parts[3]).unwrap() > 0f64); assert!(f64::from_str(parts[4]).unwrap() > 0f64); assert_eq!(usize::from_str(parts[5]).unwrap(), 11); From 4675eb962c11b88f4ce4773c452b5e03542f61fc Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 3 Nov 2025 15:21:10 +0900 Subject: [PATCH 06/10] save --- src/client.rs | 51 +++++++++++++++++++++++++------------------- src/client_h3.rs | 41 ++++++++++++++++++++--------------- src/lib.rs | 11 +++++----- src/small_instant.rs | 17 ++++++++++++++- 4 files changed, 74 insertions(+), 46 deletions(-) diff --git a/src/client.rs b/src/client.rs index 51bcc4b0..9cb85124 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,6 +23,7 @@ use crate::{ ConnectToEntry, pcg64si::Pcg64Si, request_generator::{RequestGenerationError, RequestGenerator}, + small_instant, url_generator::UrlGeneratorError, }; @@ -1585,7 +1586,7 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: std::time::Instant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1695,7 +1696,7 @@ pub async fn work_until( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -1724,7 +1725,7 @@ pub async fn work_until( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -1751,8 +1752,8 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: small_instant::SmallInstant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1778,11 +1779,14 @@ pub async fn work_until_with_qps( let (tx, rx) = kanal::unbounded::<()>(); tokio::spawn(async move { for i in 0.. { - if std::time::Instant::now() > dead_line { + if small_instant::SmallInstant::now() > dead_line { break; } tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; let _ = tx.send(()); @@ -1796,7 +1800,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if std::time::Instant::now() > dead_line { + if small_instant::SmallInstant::now() > dead_line { break; } @@ -1904,7 +1908,7 @@ pub async fn work_until_with_qps( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -1934,7 +1938,7 @@ pub async fn work_until_with_qps( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -1961,8 +1965,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: small_instant::SmallInstant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1989,15 +1993,17 @@ pub async fn work_until_with_qps_latency_correction( tokio::spawn(async move { for i in 0.. { tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; - // TODO: use minstant only - let now = std::time::Instant::now(); + let now = small_instant::SmallInstant::now(); if now > dead_line { break; } - let _ = tx.send(crate::small_instant::SmallInstant::now()); + let _ = tx.send(now); } // tx gone }); @@ -2007,13 +2013,13 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = small_instant::SmallInstant::now(); if now > dead_line { break; } for _ in 0..rate { - let _ = tx.send(crate::small_instant::SmallInstant::now()); + let _ = tx.send(now); } } // tx gone @@ -2113,7 +2119,7 @@ pub async fn work_until_with_qps_latency_correction( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; s.close(); for f in futures { @@ -2144,7 +2150,7 @@ pub async fn work_until_with_qps_latency_correction( }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + tokio::time::sleep_until(Into::::into(dead_line).into()).await; is_end.store(true, Relaxed); if wait_ongoing_requests_after_deadline { @@ -2181,6 +2187,7 @@ pub mod fast { }, pcg64si::Pcg64Si, result_data::ResultData, + small_instant, }; use super::Client; @@ -2412,7 +2419,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: std::time::Instant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -2617,7 +2624,7 @@ pub mod fast { .collect::>(), }; tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { + _ = tokio::time::sleep_until(Into::::into(dead_line).into()) => { } _ = tokio::signal::ctrl_c() => { } diff --git a/src/client_h3.rs b/src/client_h3.rs index 213b7bc8..e50521af 100644 --- a/src/client_h3.rs +++ b/src/client_h3.rs @@ -44,6 +44,7 @@ use crate::client::{ }; use crate::pcg64si::Pcg64Si; use crate::result_data::ResultData; +use crate::small_instant; use rand::SeedableRng; use rand::prelude::Rng; @@ -237,7 +238,7 @@ pub(crate) async fn parallel_work_http3( rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, - deadline: Option, + deadline: Option, ) -> Vec> { let s = Arc::new(tokio::sync::Semaphore::new(0)); let has_deadline = deadline.is_some(); @@ -259,7 +260,7 @@ pub(crate) async fn parallel_work_http3( .collect::>(); if has_deadline { - tokio::time::sleep_until(deadline.unwrap().into()).await; + tokio::time::sleep_until(Into::::into(deadline.unwrap()).into()).await; s.close(); } @@ -701,7 +702,7 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: std::time::Instant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http_parallel: usize, _wait_ongoing_requests_after_deadline: bool, @@ -734,8 +735,8 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: small_instant::SmallInstant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, @@ -745,11 +746,14 @@ pub async fn work_until_with_qps( let (tx, rx) = kanal::unbounded::>(); tokio::spawn(async move { for i in 0.. { - if std::time::Instant::now() > dead_line { + if small_instant::SmallInstant::now() > dead_line { break; } tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; let _ = tx.send(None); @@ -763,7 +767,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if std::time::Instant::now() > dead_line { + if small_instant::SmallInstant::now() > dead_line { break; } @@ -798,8 +802,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, + start: small_instant::SmallInstant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, @@ -810,14 +814,17 @@ pub async fn work_until_with_qps_latency_correction( tokio::spawn(async move { for i in 0.. { tokio::time::sleep_until( - (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), + Into::::into( + start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps), + ) + .into(), ) .await; - let now = std::time::Instant::now(); + let now = small_instant::SmallInstant::now(); if now > dead_line { break; } - let _ = tx.send(Some(crate::small_instant::SmallInstant::now())); + let _ = tx.send(Some(now)); } // tx gone }); @@ -827,12 +834,11 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = std::time::Instant::now(); + let now = small_instant::SmallInstant::now(); if now > dead_line { break; } - let now = crate::small_instant::SmallInstant::now(); for _ in 0..rate { let _ = tx.send(Some(now)); } @@ -886,6 +892,7 @@ pub mod fast { use crate::{ client::Client, client_h3::http3_connection_fast_work_until, result_data::ResultData, + small_instant, }; /// Run n tasks by m workers @@ -954,7 +961,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: std::time::Instant, + dead_line: small_instant::SmallInstant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1001,7 +1008,7 @@ pub mod fast { }) .collect::>(); tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { + _ = tokio::time::sleep_until(Into::::into(dead_line).into()) => { } _ = tokio::signal::ctrl_c() => { } diff --git a/src/lib.rs b/src/lib.rs index cb1d4695..3238702c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -646,8 +646,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { let run = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_secs(); - let start = std::time::Instant::now(); - let start_minstant = crate::small_instant::SmallInstant::now(); + let start = crate::small_instant::SmallInstant::now(); let data_collect_future: Pin>> = match work_mode { @@ -729,7 +728,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { for res in buf { all.push(res); } - let _ = printer::print_result(print_config, start_minstant, &all, start_minstant.elapsed()); + let _ = printer::print_result(print_config, start, &all, start.elapsed()); std::process::exit(libc::EXIT_SUCCESS); } _ = token_ctrl_c.cancelled() => { @@ -759,7 +758,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { .map(|d| monitor::EndLine::Duration(d.into())) .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), report_receiver: result_rx, - start: start_minstant, + start, fps: opts.fps, disable_color: opts.disable_color, time_unit: opts.time_unit, @@ -868,11 +867,11 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { let duration = start.elapsed(); let (res, print_config) = data_collect_future.await; - printer::print_result(print_config, start_minstant, &res, duration)?; + printer::print_result(print_config, start, &res, duration)?; if let Some(db_url) = opts.db_url { eprintln!("Storing results to {db_url}"); - db::store(&client, &db_url, start_minstant, res.success(), run)?; + db::store(&client, &db_url, start, res.success(), run)?; } Ok(()) diff --git a/src/small_instant.rs b/src/small_instant.rs index dd41e3a9..154e1f85 100644 --- a/src/small_instant.rs +++ b/src/small_instant.rs @@ -1,4 +1,7 @@ -use std::{num::NonZeroU64, ops::Sub}; +use std::{ + num::NonZeroU64, + ops::{Add, Sub}, +}; #[static_init::dynamic] static START_INSTANT: std::time::Instant = std::time::Instant::now(); @@ -32,6 +35,18 @@ impl Into for SmallInstant { } } +impl Add for SmallInstant { + type Output = SmallInstant; + + fn add(self, rhs: std::time::Duration) -> Self::Output { + let duration_nanos = self.nanos.get() + rhs.as_nanos() as u64; + + SmallInstant { + nanos: NonZeroU64::new(duration_nanos).unwrap(), + } + } +} + impl Sub for SmallInstant { type Output = std::time::Duration; From 314af18038232716e8755b0f787e41590b204131 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 3 Nov 2025 21:28:41 +0900 Subject: [PATCH 07/10] fix --- src/small_instant.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/small_instant.rs b/src/small_instant.rs index 154e1f85..7f70c283 100644 --- a/src/small_instant.rs +++ b/src/small_instant.rs @@ -29,9 +29,9 @@ impl SmallInstant { } } -impl Into for SmallInstant { - fn into(self) -> std::time::Instant { - *START_INSTANT + std::time::Duration::from_nanos(self.nanos.get()) +impl From for std::time::Instant { + fn from(val: SmallInstant) -> Self { + *START_INSTANT + std::time::Duration::from_nanos(val.nanos.get()) } } From 48d26cce117b87ec86bcbc668f27caee8ee19e3a Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 3 Nov 2025 21:36:08 +0900 Subject: [PATCH 08/10] a --- src/db.rs | 4 ++-- src/small_instant.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 7967511f..7446c534 100644 --- a/src/db.rs +++ b/src/db.rs @@ -72,10 +72,10 @@ mod test_db { status: hyper::StatusCode::OK, len_bytes: 100, start_latency_correction: None, - start: crate::small_instant::SmallInstant::now(), + start: start + std::time::Duration::from_millis(50), connection_time: None, first_byte: None, - end: crate::small_instant::SmallInstant::now(), + end: start + std::time::Duration::from_millis(150), }; let test_vec = vec![test_val.clone(), test_val.clone()]; let client = Client::default(); diff --git a/src/small_instant.rs b/src/small_instant.rs index 7f70c283..4481e6ec 100644 --- a/src/small_instant.rs +++ b/src/small_instant.rs @@ -14,8 +14,9 @@ pub struct SmallInstant { impl SmallInstant { pub fn now() -> Self { + let start = *START_INSTANT; let now = std::time::Instant::now(); - let nanos = now.duration_since(*START_INSTANT).as_nanos() as u64; + let nanos = now.duration_since(start).as_nanos() as u64; SmallInstant { nanos: NonZeroU64::new(nanos).unwrap(), From bba593ba3e35fd29a0b40b6f83e89f4d5738272f Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 4 Nov 2025 21:59:49 +0900 Subject: [PATCH 09/10] update test --- tests/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests.rs b/tests/tests.rs index 57b2796f..99c02ebb 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1241,8 +1241,8 @@ async fn test_csv_output() { let current_start = f64::from_str(parts[0]).unwrap(); assert!(current_start >= latest_start); latest_start = current_start; - assert!(f64::from_str(parts[1]).unwrap() >= 0f64); - assert!(f64::from_str(parts[2]).unwrap() >= 0f64); + assert!(f64::from_str(parts[1]).unwrap() > 0f64); + assert!(f64::from_str(parts[2]).unwrap() > 0f64); assert!(f64::from_str(parts[3]).unwrap() > 0f64); assert!(f64::from_str(parts[4]).unwrap() > 0f64); assert_eq!(usize::from_str(parts[5]).unwrap(), 11); From cf3ebad99a6f4321886aca53f856eae8f493e379 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 9 Nov 2025 16:17:03 +0900 Subject: [PATCH 10/10] feature gate --- Cargo.toml | 3 +- src/client.rs | 84 ++++++++++++++++++++++------------------------ src/client_h3.rs | 58 ++++++++++++++++---------------- src/db.rs | 4 +-- src/lib.rs | 8 ++++- src/monitor.rs | 4 +-- src/printer.rs | 10 ++---- src/result_data.rs | 4 +-- 8 files changed, 87 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23d3f8da..6d7ba66c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ rustls = [ ] vsock = ["dep:tokio-vsock"] http3 = ["dep:h3", "dep:h3-quinn", "dep:quinn-proto", "dep:quinn", "dep:http"] +small_instant = ["dep:static_init"] [dependencies] anyhow = "1.0.86" @@ -83,7 +84,7 @@ tokio-vsock = { version = "0.7.2", optional = true } rusqlite = { version = "0.37.0", features = ["bundled"] } num_cpus = "1.16.0" tokio-util = "0.7.13" -static_init = "1.0.4" +static_init = { version = "1.0.4", optional = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/client.rs b/src/client.rs index 1f5bbf2f..51eb3f86 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,7 +23,6 @@ use crate::{ ConnectToEntry, pcg64si::Pcg64Si, request_generator::{RequestGenerationError, RequestGenerator}, - small_instant, url_generator::UrlGeneratorError, }; @@ -52,16 +51,16 @@ pub struct ConnectionTime { pub struct RequestResult { pub rng: Pcg64Si, // When the query should started - pub start_latency_correction: Option, + pub start_latency_correction: Option, /// When the query started - pub start: crate::small_instant::SmallInstant, + pub start: crate::Instant, /// DNS + dialup /// None when reuse connection pub connection_time: Option, /// First body byte received - pub first_byte: Option, + pub first_byte: Option, /// When the query ends - pub end: crate::small_instant::SmallInstant, + pub end: crate::Instant, /// HTTP status pub status: http::StatusCode, /// Length of body @@ -497,13 +496,13 @@ impl Client { url: &Url, rng: &mut R, http_version: http::Version, - ) -> Result<(crate::small_instant::SmallInstant, Stream), ClientError> { + ) -> Result<(crate::Instant, Stream), ClientError> { let timeout_duration = self.connect_timeout; #[cfg(feature = "http3")] if http_version == http::Version::HTTP_3 { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = crate::small_instant::SmallInstant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, self.quic_client(addr, url)).await; return match stream { Ok(Ok(stream)) => Ok((dns_lookup, stream)), @@ -513,7 +512,7 @@ impl Client { } if url.scheme() == "https" { let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = crate::small_instant::SmallInstant::now(); + let dns_lookup = crate::Instant::now(); // If we do not put a timeout here then the connections attempts will // linger long past the configured timeout let stream = @@ -527,7 +526,7 @@ impl Client { } #[cfg(unix)] if let Some(socket_path) = &self.unix_socket { - let dns_lookup = crate::small_instant::SmallInstant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout( timeout_duration, tokio::net::UnixStream::connect(socket_path), @@ -541,7 +540,7 @@ impl Client { } #[cfg(feature = "vsock")] if let Some(addr) = self.vsock_addr { - let dns_lookup = crate::small_instant::SmallInstant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio_vsock::VsockStream::connect(addr)) .await; @@ -553,7 +552,7 @@ impl Client { } // HTTP let addr = self.dns.lookup(url, rng).await?; - let dns_lookup = crate::small_instant::SmallInstant::now(); + let dns_lookup = crate::Instant::now(); let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; match stream { @@ -624,7 +623,7 @@ impl Client { &self, url: &Url, rng: &mut R, - ) -> Result<(crate::small_instant::SmallInstant, SendRequestHttp1), ClientError> { + ) -> Result<(crate::Instant, SendRequestHttp1), ClientError> { if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -687,8 +686,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let mut start = crate::small_instant::SmallInstant::now(); - let mut first_byte: Option = None; + let mut start = crate::Instant::now(); + let mut first_byte: Option = None; let mut connection_time: Option = None; let mut send_request = if let Some(send_request) = client_state.send_request.take() { @@ -696,7 +695,7 @@ impl Client { } else { let (dns_lookup, send_request) = self.client_http1(&self.url, &mut client_state.rng).await?; - let dialup = crate::small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup: dns_lookup - start, @@ -707,11 +706,11 @@ impl Client { while send_request.ready().await.is_err() { // This gets hit when the connection for HTTP/1.1 faults // This re-connects - start = crate::small_instant::SmallInstant::now(); + start = crate::Instant::now(); let (dns_lookup, send_request_) = self.client_http1(&self.url, &mut client_state.rng).await?; send_request = send_request_; - let dialup = small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); connection_time = Some(ConnectionTime { dns_lookup: dns_lookup - start, dialup: dialup - start, @@ -725,7 +724,7 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(crate::small_instant::SmallInstant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } @@ -748,7 +747,7 @@ impl Client { } } - let end = crate::small_instant::SmallInstant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -793,7 +792,7 @@ impl Client { url: &Url, rng: &mut R, ) -> Result<(ConnectionTime, SendRequestHttp2), ClientError> { - let start = small_instant::SmallInstant::now(); + let start = crate::Instant::now(); if let Some(proxy_url) = &self.proxy_url { let http_proxy_version = if self.is_proxy_http2() { http::Version::HTTP_2 @@ -843,7 +842,7 @@ impl Client { .handshake(TokioIo::new(stream)) .await?; tokio::spawn(conn); - let dialup = crate::small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { @@ -854,7 +853,7 @@ impl Client { )) } else { let send_request = stream.handshake_http2().await?; - let dialup = small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -866,7 +865,7 @@ impl Client { } else { let (dns_lookup, stream) = self.client(url, rng, self.http_version).await?; let send_request = stream.handshake_http2().await?; - let dialup = small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -883,8 +882,8 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = crate::small_instant::SmallInstant::now(); - let mut first_byte: Option = None; + let start = crate::Instant::now(); + let mut first_byte: Option = None; let connection_time: Option = None; match client_state.send_request.send_request(request).await { @@ -895,12 +894,12 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.frame().await { if first_byte.is_none() { - first_byte = Some(crate::small_instant::SmallInstant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk?.data_ref().map(|d| d.len()).unwrap_or_default(); } - let end = crate::small_instant::SmallInstant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -1066,7 +1065,7 @@ async fn work_http2_once( client_state: &mut ClientStateHttp2, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http2(client_state).await; let is_cancel = is_cancel_error(&res); @@ -1090,7 +1089,7 @@ pub(crate) fn set_connection_time( pub(crate) fn set_start_latency_correction( res: &mut Result, - start_latency_correction: crate::small_instant::SmallInstant, + start_latency_correction: crate::Instant, ) { if let Ok(res) = res { res.start_latency_correction = Some(start_latency_correction); @@ -1466,7 +1465,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); tx.send(now)?; } } @@ -1475,7 +1474,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); for _ in 0..rate { tx.send(now)?; } @@ -1484,7 +1483,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); for _ in 0..n_tasks - n { tx.send(now)?; } @@ -1611,7 +1610,7 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: small_instant::SmallInstant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1777,8 +1776,8 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: small_instant::SmallInstant, - dead_line: small_instant::SmallInstant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -1804,7 +1803,7 @@ pub async fn work_until_with_qps( let (tx, rx) = kanal::unbounded::<()>(); tokio::spawn(async move { for i in 0.. { - if small_instant::SmallInstant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } tokio::time::sleep_until( @@ -1825,7 +1824,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if small_instant::SmallInstant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } @@ -1990,8 +1989,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: small_instant::SmallInstant, - dead_line: small_instant::SmallInstant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, @@ -2024,7 +2023,7 @@ pub async fn work_until_with_qps_latency_correction( .into(), ) .await; - let now = small_instant::SmallInstant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -2038,7 +2037,7 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = small_instant::SmallInstant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -2212,7 +2211,6 @@ pub mod fast { }, pcg64si::Pcg64Si, result_data::ResultData, - small_instant, }; use super::Client; @@ -2444,7 +2442,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: small_instant::SmallInstant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, diff --git a/src/client_h3.rs b/src/client_h3.rs index 02a420f1..a8298927 100644 --- a/src/client_h3.rs +++ b/src/client_h3.rs @@ -44,7 +44,6 @@ use crate::client::{ }; use crate::pcg64si::Pcg64Si; use crate::result_data::ResultData; -use crate::small_instant; use rand::SeedableRng; use rand::prelude::Rng; @@ -68,10 +67,10 @@ impl Client { url: &Url, rng: &mut R, ) -> Result<(ConnectionTime, SendRequestHttp3), ClientError> { - let start = small_instant::SmallInstant::now(); + let start = crate::Instant::now(); let (dns_lookup, stream) = self.client(url, rng, http::Version::HTTP_3).await?; let send_request = stream.handshake_http3().await?; - let dialup = small_instant::SmallInstant::now(); + let dialup = crate::Instant::now(); Ok(( ConnectionTime { dns_lookup: dns_lookup - start, @@ -120,9 +119,9 @@ impl Client { ) -> Result { let do_req = async { let (request, rng) = self.generate_request(&mut client_state.rng)?; - let start = crate::small_instant::SmallInstant::now(); + let start = crate::Instant::now(); let connection_time: Option = None; - let mut first_byte: Option = None; + let mut first_byte: Option = None; // if we implement http_body::Body on our H3 SendRequest, we can do some nice streaming stuff // with the response here. However as we don't really use the response we can get away @@ -150,11 +149,11 @@ impl Client { let mut len_bytes = 0; while let Some(chunk) = stream.recv_data().await.map_err(Http3Error::from)? { if first_byte.is_none() { - first_byte = Some(crate::small_instant::SmallInstant::now()) + first_byte = Some(crate::Instant::now()) } len_bytes += chunk.remaining(); } - let end = crate::small_instant::SmallInstant::now(); + let end = crate::Instant::now(); let result = RequestResult { rng, @@ -242,10 +241,10 @@ pub(crate) async fn send_debug_request_http3( pub(crate) async fn parallel_work_http3( n_connections: usize, n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, - deadline: Option, + deadline: Option, ) -> Vec> { let s = Arc::new(tokio::sync::Semaphore::new(0)); let has_deadline = deadline.is_some(); @@ -281,7 +280,7 @@ pub(crate) async fn parallel_work_http3( */ async fn create_and_load_up_single_connection_http3( n_http_parallel: usize, - rx: AsyncReceiver>, + rx: AsyncReceiver>, report_tx: kanal::Sender>, client: Arc, s: Arc, @@ -415,7 +414,7 @@ pub(crate) async fn work_http3_once( client_state: &mut ClientStateHttp3, report_tx: &kanal::Sender>, connection_time: ConnectionTime, - start_latency_correction: Option, + start_latency_correction: Option, ) -> (bool, bool) { let mut res = client.work_http3(client_state).await; let is_cancel = is_cancel_error(&res); @@ -572,7 +571,7 @@ pub async fn work( n_connections: usize, n_http2_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let rx = rx.to_async(); let n_tasks_emitter = async move { @@ -599,7 +598,7 @@ pub async fn work_with_qps( n_connections: usize, n_http_parallel: usize, ) { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); let work_queue = async move { match query_limit { @@ -666,7 +665,7 @@ pub async fn work_with_qps_latency_correction( (start + std::time::Duration::from_secs_f64(i as f64 * 1f64 / qps)).into(), ) .await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); tx.send(Some(now))?; } } @@ -675,7 +674,7 @@ pub async fn work_with_qps_latency_correction( // Handle via rate till n_tasks out of bound while n + rate < n_tasks { tokio::time::sleep(duration).await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); for _ in 0..rate { tx.send(Some(now))?; } @@ -684,7 +683,7 @@ pub async fn work_with_qps_latency_correction( // Handle the remaining tasks if n_tasks > n { tokio::time::sleep(duration).await; - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); for _ in 0..n_tasks - n { tx.send(Some(now))?; } @@ -709,12 +708,12 @@ pub async fn work_with_qps_latency_correction( pub async fn work_until( client: Arc, report_tx: kanal::Sender>, - dead_line: small_instant::SmallInstant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { - let (tx, rx) = kanal::bounded_async::>(5000); + let (tx, rx) = kanal::bounded_async::>(5000); // This emitter is used for H3 to give it unlimited tokens to emit work. let cancel_token = tokio_util::sync::CancellationToken::new(); let emitter_handle = endless_emitter(cancel_token.clone(), tx).await; @@ -742,18 +741,18 @@ pub async fn work_until_with_qps( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: small_instant::SmallInstant, - dead_line: small_instant::SmallInstant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, ) { let rx = match query_limit { QueryLimit::Qps(qps) => { - let (tx, rx) = kanal::unbounded::>(); + let (tx, rx) = kanal::unbounded::>(); tokio::spawn(async move { for i in 0.. { - if small_instant::SmallInstant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } tokio::time::sleep_until( @@ -774,7 +773,7 @@ pub async fn work_until_with_qps( tokio::spawn(async move { // Handle via rate till deadline is reached for _ in 0.. { - if small_instant::SmallInstant::now() > dead_line { + if crate::Instant::now() > dead_line { break; } @@ -809,8 +808,8 @@ pub async fn work_until_with_qps_latency_correction( client: Arc, report_tx: kanal::Sender>, query_limit: QueryLimit, - start: small_instant::SmallInstant, - dead_line: small_instant::SmallInstant, + start: crate::Instant, + dead_line: crate::Instant, n_connections: usize, n_http2_parallel: usize, _wait_ongoing_requests_after_deadline: bool, @@ -827,7 +826,7 @@ pub async fn work_until_with_qps_latency_correction( .into(), ) .await; - let now = small_instant::SmallInstant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -841,7 +840,7 @@ pub async fn work_until_with_qps_latency_correction( // Handle via rate till deadline is reached loop { tokio::time::sleep(duration).await; - let now = small_instant::SmallInstant::now(); + let now = crate::Instant::now(); if now > dead_line { break; } @@ -873,7 +872,7 @@ pub async fn work_until_with_qps_latency_correction( #[cfg(feature = "http3")] async fn endless_emitter( cancellation_token: tokio_util::sync::CancellationToken, - tx: kanal::AsyncSender>, + tx: kanal::AsyncSender>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { @@ -899,7 +898,6 @@ pub mod fast { use crate::{ client::Client, client_h3::http3_connection_fast_work_until, result_data::ResultData, - small_instant, }; /// Run n tasks by m workers @@ -968,7 +966,7 @@ pub mod fast { pub async fn work_until( client: Arc, report_tx: kanal::Sender, - dead_line: small_instant::SmallInstant, + dead_line: crate::Instant, n_connections: usize, n_http_parallel: usize, wait_ongoing_requests_after_deadline: bool, diff --git a/src/db.rs b/src/db.rs index 7446c534..cdb43ef3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -21,7 +21,7 @@ fn create_db(conn: &Connection) -> Result { pub fn store( client: &Client, db_url: &str, - start: crate::small_instant::SmallInstant, + start: crate::Instant, request_records: &[RequestResult], run: u64, ) -> Result { @@ -66,7 +66,7 @@ mod test_db { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - let start = crate::small_instant::SmallInstant::now(); + let start = crate::Instant::now(); let test_val = RequestResult { rng: SeedableRng::seed_from_u64(0), status: hyper::StatusCode::OK, diff --git a/src/lib.rs b/src/lib.rs index 3238702c..959fda13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ mod pcg64si; mod printer; mod request_generator; mod result_data; +#[cfg(feature = "small_instant")] mod small_instant; mod timescale; mod tls_config; @@ -51,6 +52,11 @@ use crate::{ request_generator::{BodyGenerator, Proxy, RequestGenerator}, }; +#[cfg(feature = "small_instant")] +pub type Instant = small_instant::SmallInstant; +#[cfg(not(feature = "small_instant"))] +pub type Instant = std::time::Instant; + #[cfg(not(target_env = "msvc"))] #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; @@ -646,7 +652,7 @@ pub async fn run(mut opts: Opts) -> anyhow::Result<()> { let run = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_secs(); - let start = crate::small_instant::SmallInstant::now(); + let start = crate::Instant::now(); let data_collect_future: Pin>> = match work_mode { diff --git a/src/monitor.rs b/src/monitor.rs index 30df1475..8581209a 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -53,7 +53,7 @@ pub struct Monitor { /// All workers sends each result to this channel pub report_receiver: kanal::Receiver>, // When started - pub start: crate::small_instant::SmallInstant, + pub start: crate::Instant, // Frame per second of TUI pub fps: usize, pub disable_color: bool, @@ -114,7 +114,7 @@ impl Monitor { break; } - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); let progress = match &self.end_line { EndLine::Duration(d) => { ((now - self.start).as_secs_f64() / d.as_secs_f64()).clamp(0.0, 1.0) diff --git a/src/printer.rs b/src/printer.rs index 9012801f..97bd4f68 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -110,7 +110,7 @@ pub struct PrintConfig { pub fn print_result( mut config: PrintConfig, - start: crate::small_instant::SmallInstant, + start: crate::Instant, res: &ResultData, total_duration: Duration, ) -> anyhow::Result<()> { @@ -139,7 +139,7 @@ pub fn print_result( /// Print all summary as JSON fn print_json( w: &mut W, - start: crate::small_instant::SmallInstant, + start: crate::Instant, res: &ResultData, total_duration: Duration, stats_success_breakdown: bool, @@ -376,11 +376,7 @@ fn print_json( ) } -fn print_csv( - w: &mut W, - start: crate::small_instant::SmallInstant, - res: &ResultData, -) -> std::io::Result<()> { +fn print_csv(w: &mut W, start: crate::Instant, res: &ResultData) -> std::io::Result<()> { // csv header writeln!( w, diff --git a/src/result_data.rs b/src/result_data.rs index 05eb136d..f6520b98 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -106,7 +106,7 @@ impl ResultData { pub fn end_times_from_start( &self, - start: crate::small_instant::SmallInstant, + start: crate::Instant, ) -> impl Iterator + '_ { self.success.iter().map(move |result| result.end - start) } @@ -196,7 +196,7 @@ mod tests { first_byte: u64, size: usize, ) -> Result { - let now = crate::small_instant::SmallInstant::now(); + let now = crate::Instant::now(); Ok(RequestResult { rng: SeedableRng::seed_from_u64(0), start_latency_correction: None,