From f1035b40328c2e5034e249bb2584838a42c863d1 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 9 May 2025 11:19:54 +0200 Subject: [PATCH 1/6] feat: improve transfer example --- iroh/examples/transfer.rs | 401 ++++++++++++++++++++++---------------- 1 file changed, 231 insertions(+), 170 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index f64047bbcfd..ac024828ce8 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -8,15 +8,34 @@ use bytes::Bytes; use clap::{Parser, Subcommand}; use indicatif::HumanBytes; use iroh::{ - discovery::{dns::DnsDiscovery, pkarr::PkarrPublisher}, - endpoint::{ConnectionError, PathSelection}, - Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, + discovery::{ + dns::DnsDiscovery, + pkarr::{PkarrPublisher, N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING}, + }, + dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, + endpoint::ConnectionError, + Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey, }; use iroh_base::ticket::NodeTicket; -use tracing::info; +use tokio_stream::StreamExt; +use tracing::{info, warn}; +use url::Url; // Transfer ALPN that we are using to communicate over the `Endpoint` const TRANSFER_ALPN: &[u8] = b"n0/iroh/transfer/example/0"; +/// Transfer data between iroh nodes. +/// +/// This is a useful example to test connection establishment and transfer speed. +/// +/// Note that some options are only available with optional features: +/// +/// --relay-only needs the `test-utils` feature +/// +/// --mdns needs the `discovery-local-network` feature +/// +/// To enable all features, run the example with --all-features: +/// +/// cargo run --release --example transfer --all-features -- ARGS #[derive(Parser, Debug)] #[command(name = "transfer")] struct Cli { @@ -24,31 +43,64 @@ struct Cli { command: Commands, } +#[derive(Clone, Copy, Default, Debug, clap::ValueEnum)] +enum Env { + /// Use the production servers hosted by n0. + #[default] + Prod, + /// Use the staging servers hosted by n0. + Staging, +} + +#[derive(Debug, clap::Parser)] +struct EndpointArgs { + /// Set the environment for relay, pkarr, and DNS servers. + /// + /// If other options are set, those will override the environment defaults. + #[clap(short, long, value_enum, default_value_t)] + environment: Env, + /// Set one or more relay servers to use. + #[clap(long)] + relay_url: Vec, + /// Disable relays completely. + #[clap(long)] + no_relay: bool, + /// If set no direct connections will be established. + #[clap(long, default_value = "false")] + relay_only: bool, + /// Use a custom pkarr server. + #[clap(long)] + pkarr_relay_url: Option, + /// Disable publishing node info to pkarr. + #[clap(long)] + no_pkarr_publish: bool, + /// Use a custom domain when resolving node info via DNS. + #[clap(long)] + dns_origin_domain: Option, + /// Do not resolve node info via DNS. + #[clap(long)] + no_dns_resolve: bool, + #[cfg(feature = "discovery-local-network")] + #[clap(long)] + /// Enable mDNS discovery, + mdns: bool, +} + #[derive(Subcommand, Debug)] enum Commands { + /// Provide data. Provide { - #[clap(long, default_value = "1G", value_parser = parse_byte_size)] + #[clap(long, default_value = "100M", value_parser = parse_byte_size)] size: u64, - #[clap(long)] - relay_url: Option, - #[clap(long, default_value = "false")] - relay_only: bool, - #[clap(long)] - pkarr_relay_url: Option, - #[clap(long)] - dns_origin_domain: Option, + #[clap(flatten)] + endpoint_args: EndpointArgs, }, + /// Fetch data. Fetch { #[arg(index = 1)] ticket: String, - #[clap(long)] - relay_url: Option, - #[clap(long, default_value = "false")] - relay_only: bool, - #[clap(long)] - pkarr_relay_url: Option, - #[clap(long)] - dns_origin_domain: Option, + #[clap(flatten)] + endpoint_args: EndpointArgs, }, } @@ -56,99 +108,127 @@ enum Commands { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let cli = Cli::parse(); - - match &cli.command { + match cli.command { Commands::Provide { size, - relay_url, - relay_only, - pkarr_relay_url, - dns_origin_domain, + endpoint_args, } => { - provide( - *size, - relay_url.clone(), - *relay_only, - pkarr_relay_url.clone(), - dns_origin_domain.clone(), - ) - .await? + let endpoint = endpoint_args.into_endpoint().await?; + provide(endpoint, size).await? } Commands::Fetch { ticket, - relay_url, - relay_only, - pkarr_relay_url, - dns_origin_domain, + endpoint_args, } => { - fetch( - ticket, - relay_url.clone(), - *relay_only, - pkarr_relay_url.clone(), - dns_origin_domain.clone(), - ) - .await? + let endpoint = endpoint_args.into_endpoint().await?; + fetch(endpoint, &ticket).await? } } Ok(()) } -async fn provide( - size: u64, - relay_url: Option, - relay_only: bool, - pkarr_relay_url: Option, - dns_origin_domain: Option, -) -> anyhow::Result<()> { - let secret_key = SecretKey::generate(rand::rngs::OsRng); - let relay_mode = match relay_url { - Some(relay_url) => { - let relay_url = RelayUrl::from_str(&relay_url)?; - RelayMode::Custom(relay_url.into()) - } - None => RelayMode::Default, - }; - let path_selection = match relay_only { - true => PathSelection::RelayOnly, - false => PathSelection::default(), - }; - - let mut endpoint_builder = Endpoint::builder(); - - if let Some(pkarr_relay_url) = pkarr_relay_url { - let pkarr_relay_url = pkarr_relay_url - .parse() - .context("Invalid pkarr URL provided")?; - - let pkarr_discovery_closure = move |secret_key: &SecretKey| { - let pkarr_d = PkarrPublisher::new(secret_key.clone(), pkarr_relay_url); - Some(pkarr_d) +impl EndpointArgs { + async fn into_endpoint(self) -> anyhow::Result { + let secret_key = match std::env::var("IROH_SECRET") { + Ok(s) => SecretKey::from_str(&s) + .context("failed to parse IROH_SECRET environment variable as iroh secret key")?, + Err(_) => { + let s = SecretKey::generate(rand::rngs::OsRng); + println!("Generated a new node secret. To reuse, set"); + println!("\tIROH_SECRET={s}"); + s + } + }; + + let relay_mode = if self.no_relay { + RelayMode::Disabled + } else { + match (self.relay_url.is_empty(), self.environment) { + (true, Env::Prod) => RelayMode::Default, + (true, Env::Staging) => RelayMode::Staging, + (false, _) => { + let urls: Result, _> = self + .relay_url + .iter() + .map(|u| RelayUrl::from_str(u)) + .collect(); + let urls = urls.context("failed to parse relay URL")?; + RelayMode::Custom(RelayMap::from_iter(urls)) + } + } }; - endpoint_builder = endpoint_builder.add_discovery(pkarr_discovery_closure); - } - if let Some(dns_origin_domain) = dns_origin_domain { - let dns_discovery_closure = move |_: &SecretKey| Some(DnsDiscovery::new(dns_origin_domain)); + let mut endpoint_builder = Endpoint::builder(); + + if !self.no_pkarr_publish { + let url = match (&self.pkarr_relay_url, self.environment) { + (None, Env::Prod) => N0_DNS_PKARR_RELAY_PROD, + (None, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING, + (Some(url), _) => url, + }; + let url = Url::from_str(url).context("failed to parse pkarr relay url")?; + endpoint_builder = endpoint_builder + .add_discovery(|secret_key| Some(PkarrPublisher::new(secret_key.clone(), url))); + } - endpoint_builder = endpoint_builder.add_discovery(dns_discovery_closure); - } + if !self.no_dns_resolve { + let origin_domain = match (self.dns_origin_domain, self.environment) { + (None, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD.to_string(), + (None, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING.to_string(), + (Some(domain), _) => domain, + }; + endpoint_builder = + endpoint_builder.add_discovery(|_| Some(DnsDiscovery::new(origin_domain))); + } - let endpoint = endpoint_builder - .secret_key(secret_key) - .alpns(vec![TRANSFER_ALPN.to_vec()]) - .relay_mode(relay_mode) - .path_selection(path_selection) - .bind() - .await?; + #[cfg(feature = "discovery-local-network")] + if self.mdns { + endpoint_builder = endpoint_builder.add_discovery(|secret_key| { + Some( + iroh::discovery::mdns::MdnsDiscovery::new(secret_key.public()) + .expect("Failed to create mDNS discovery"), + ) + }); + } - let node_id = endpoint.node_id(); + #[cfg(feature = "test-utils")] + if self.relay_only { + endpoint_builder = + endpoint_builder.path_selection(iroh::endpoint::PathSelection::RelayOnly) + } - for local_endpoint in endpoint.direct_addresses().initialized().await? { - println!("\t{}", local_endpoint.addr) + let endpoint = endpoint_builder + .secret_key(secret_key) + .alpns(vec![TRANSFER_ALPN.to_vec()]) + .relay_mode(relay_mode.clone()) + .bind() + .await?; + + let node_id = endpoint.node_id(); + println!("Our node id:\n\t{node_id}"); + + println!("Our direct addresses:"); + for local_endpoint in endpoint.direct_addresses().initialized().await? { + println!("\t{} (type: {:?})", local_endpoint.addr, local_endpoint.typ) + } + + if !matches!(relay_mode, RelayMode::Disabled) { + let relay_url = endpoint + .home_relay() + .get()? + .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); + println!("Our home relay server:\n\t{relay_url}"); + } + + println!(""); + + Ok(endpoint) } +} +async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { + let node_id = endpoint.node_id(); let relay_url = endpoint .home_relay() .get()? @@ -161,17 +241,28 @@ async fn provide( .map(|endpoint| endpoint.addr) .collect::>(); - let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), local_addrs); + let node_addr = NodeAddr::from_parts(node_id, Some(relay_url.clone()), local_addrs); let ticket = NodeTicket::new(node_addr); - - println!("NodeTicket: {}", ticket); + println!( + "Ticket with our home relay and direct addresses:\n{}\n", + ticket + ); + let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), vec![]); + let ticket = NodeTicket::new(node_addr); + println!( + "Ticket with our home relay but no direct addresses:\n{}\n", + ticket + ); + let node_addr = NodeAddr::from_parts(node_id, None, vec![]); + let ticket = NodeTicket::new(node_addr); + println!("Ticket with only our node id:\n{}\n", ticket); // accept incoming connections, returns a normal QUIC connection while let Some(incoming) = endpoint.accept().await { let connecting = match incoming.accept() { Ok(connecting) => connecting, Err(err) => { - tracing::warn!("incoming connection failed: {err:#}"); + warn!("incoming connection failed: {err:#}"); // we can carry on in these cases: // this can be caused by retransmitted datagrams continue; @@ -185,29 +276,54 @@ async fn provide( ); // spawn a task to handle reading and writing off of the connection + let endpoint_clone = endpoint.clone(); tokio::spawn(async move { + let remote = node_id.fmt_short(); + println!("[{remote}] Connected"); + + let mut conn_type_stream = endpoint_clone.conn_type(node_id).unwrap().stream(); + let conn_type_task = tokio::task::spawn(async move { + let remote = node_id.fmt_short(); + while let Some(conn_type) = conn_type_stream.next().await { + println!("[{remote}] Connection type changed to `{conn_type}`"); + } + }); + // accept a bi-directional QUIC connection // use the `quinn` APIs to send and recv content let (mut send, mut recv) = conn.accept_bi().await?; tracing::debug!("accepted bi stream, waiting for data..."); let message = recv.read_to_end(100).await?; let message = String::from_utf8(message)?; - println!("received: {message}"); + println!("[{remote}] Received: \"{message}\""); + let start = Instant::now(); send_data_on_stream(&mut send, size).await?; // We sent the last message, so wait for the client to close the connection once // it received this message. let res = tokio::time::timeout(Duration::from_secs(3), async move { let closed = conn.closed().await; + let remote = node_id.fmt_short(); if !matches!(closed, ConnectionError::ApplicationClosed(_)) { - println!("node {node_id} disconnected with an error: {closed:#}"); + println!("[{remote}] Node disconnected with an error: {closed:#}"); } }) .await; + let duration = start.elapsed(); + + println!( + "[{remote}] Transferred {} in {:.4}, {}/s", + HumanBytes(size as u64), + duration.as_secs_f64(), + HumanBytes((size as f64 / duration.as_secs_f64()) as u64) + ); if res.is_err() { - println!("node {node_id} did not disconnect within 3 seconds"); + println!("[{remote}] Did not disconnect within 3 seconds"); + } else { + println!("[{remote}] Disconnected"); } + conn_type_task.abort(); Ok::<_, anyhow::Error>(()) }); } @@ -216,84 +332,34 @@ async fn provide( Ok(()) } -async fn fetch( - ticket: &str, - relay_url: Option, - relay_only: bool, - pkarr_relay_url: Option, - dns_origin_domain: Option, -) -> anyhow::Result<()> { +async fn fetch(endpoint: Endpoint, ticket: &str) -> anyhow::Result<()> { + let me = endpoint.node_id().fmt_short(); let ticket: NodeTicket = ticket.parse()?; - let secret_key = SecretKey::generate(rand::rngs::OsRng); - let relay_mode = match relay_url { - Some(relay_url) => { - let relay_url = RelayUrl::from_str(&relay_url)?; - RelayMode::Custom(relay_url.into()) - } - None => RelayMode::Default, - }; - let path_selection = match relay_only { - true => PathSelection::RelayOnly, - false => PathSelection::default(), - }; - let mut endpoint_builder = Endpoint::builder(); - - if let Some(pkarr_relay_url) = pkarr_relay_url { - let pkarr_relay_url = pkarr_relay_url - .parse() - .context("Invalid pkarr URL provided")?; - - let pkarr_discovery_closure = move |secret_key: &SecretKey| { - let pkarr_d = PkarrPublisher::new(secret_key.clone(), pkarr_relay_url); - Some(pkarr_d) - }; - endpoint_builder = endpoint_builder.add_discovery(pkarr_discovery_closure); - } - - if let Some(dns_origin_domain) = dns_origin_domain { - let dns_discovery_closure = move |_: &SecretKey| Some(DnsDiscovery::new(dns_origin_domain)); - - endpoint_builder = endpoint_builder.add_discovery(dns_discovery_closure); - } - - let endpoint = endpoint_builder - .secret_key(secret_key) - .alpns(vec![TRANSFER_ALPN.to_vec()]) - .relay_mode(relay_mode) - .path_selection(path_selection) - .bind() - .await?; - let start = Instant::now(); - let me = endpoint.node_id(); - println!("node id: {me}"); - println!("node listening addresses:"); - for local_endpoint in endpoint.direct_addresses().initialized().await? { - println!("\t{}", local_endpoint.addr) - } - - let relay_url = endpoint - .home_relay() - .get()? - .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); - println!("node relay server url: {relay_url}\n"); + let remote = ticket.node_addr().node_id; // Attempt to connect, over the given ALPN. // Returns a Quinn connection. let conn = endpoint .connect(ticket.node_addr().clone(), TRANSFER_ALPN) .await?; - info!("connected"); + println!("Connected to {remote}"); + let mut conn_type_stream = endpoint.conn_type(remote).unwrap().stream(); + let conn_type_task = tokio::task::spawn(async move { + while let Some(conn_type) = conn_type_stream.next().await { + println!("Connection type changed to `{conn_type}`"); + } + }); // Use the Quinn API to send and recv content. let (mut send, mut recv) = conn.open_bi().await?; - let message = format!("{me} is saying 'hello!'"); + let message = format!("{me} is saying hello!"); send.write_all(message.as_bytes()).await?; - // Call `finish` to signal no more data will be sent on this stream. send.finish()?; + println!("Sent: \"{message}\""); let (len, time_to_first_byte, chnk) = drain_stream(&mut recv, false).await?; @@ -303,19 +369,14 @@ async fn fetch( let duration = start.elapsed(); println!( - "Received {} in {:.4}s with time to first byte {}s in {} chunks", + "Received {} in {:.4}s ({}/s, time to first byte {}s, {} chunks)", HumanBytes(len as u64), duration.as_secs_f64(), + HumanBytes((len as f64 / duration.as_secs_f64()) as u64), time_to_first_byte.as_secs_f64(), chnk ); - println!( - "Transferred {} in {:.4}, {}/s", - HumanBytes(len as u64), - duration.as_secs_f64(), - HumanBytes((len as f64 / duration.as_secs_f64()) as u64) - ); - + conn_type_task.abort(); Ok(()) } From 638e0ed35b3e9d276ef226abc28cc01a4474dd36 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 9 May 2025 11:29:57 +0200 Subject: [PATCH 2/6] chore: clippy --- iroh/examples/transfer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index ac024828ce8..b25acad8a21 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -221,7 +221,7 @@ impl EndpointArgs { println!("Our home relay server:\n\t{relay_url}"); } - println!(""); + println!(); Ok(endpoint) } @@ -314,7 +314,7 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { println!( "[{remote}] Transferred {} in {:.4}, {}/s", - HumanBytes(size as u64), + HumanBytes(size), duration.as_secs_f64(), HumanBytes((size as f64 / duration.as_secs_f64()) as u64) ); From e22819a6d7887caacb804e3bf48e3c9ebd4d434f Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 9 May 2025 23:15:06 +0200 Subject: [PATCH 3/6] improve publish and resolve examples --- iroh-dns-server/examples/publish.rs | 62 +++++++++++++++++------------ iroh-dns-server/examples/resolve.rs | 47 +++++++++++++++++----- iroh/examples/transfer.rs | 10 ++--- 3 files changed, 77 insertions(+), 42 deletions(-) diff --git a/iroh-dns-server/examples/publish.rs b/iroh-dns-server/examples/publish.rs index 81279553531..8d994a1fbb4 100644 --- a/iroh-dns-server/examples/publish.rs +++ b/iroh-dns-server/examples/publish.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, str::FromStr}; -use anyhow::{bail, Result}; +use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; use iroh::{ discovery::{ @@ -13,8 +13,9 @@ use iroh::{ }; use url::Url; -const LOCALHOST_PKARR: &str = "http://localhost:8080/pkarr"; -const EXAMPLE_ORIGIN: &str = "irohdns.example"; +const DEV_PKARR_RELAY_URL: &str = "http://localhost:8080/pkarr"; +const DEV_DNS_ORIGIN_DOMAIN: &str = "irohdns.example"; +const EXAMPLE_RELAY_URL: &str = "https://relay.iroh.example"; #[derive(ValueEnum, Clone, Debug, Default, Copy, strum::Display)] #[strum(serialize_all = "kebab-case")] @@ -38,19 +39,19 @@ struct Cli { env: Env, /// Pkarr Relay URL. If set, the --env option will be ignored. #[clap(long, conflicts_with = "env")] - pkarr_relay: Option, - /// Home relay server to publish for this node - #[clap(short, long)] + pkarr_relay_url: Option, + /// Home relay server URL to publish. + #[clap(short, long, conflicts_with = "no_relay_url")] relay_url: Option, - /// Direct addresses to publish for this node + /// Do not publish a home relay server URL. + #[clap(long)] + no_relay_url: bool, + /// Direct addresses to publish. #[clap(short, long)] addr: Vec, /// User data to publish for this node #[clap(short, long)] user_data: Option, - /// Create a new node secret if IROH_SECRET is unset. Only for development / debugging. - #[clap(short, long)] - create: bool, } #[tokio::main] @@ -59,28 +60,34 @@ async fn main() -> Result<()> { let args = Cli::parse(); let secret_key = match std::env::var("IROH_SECRET") { - Ok(s) => SecretKey::from_str(&s)?, - Err(_) if args.create => { + Ok(s) => SecretKey::from_str(&s) + .context("failed to parse IROH_SECRET environment variable as iroh secret key")?, + Err(_) => { let s = SecretKey::generate(rand::rngs::OsRng); println!("Generated a new node secret. To reuse, set"); - println!("IROH_SECRET={s}"); + println!("\tIROH_SECRET={s}\n"); s } - Err(_) => { - bail!("Environment variable IROH_SECRET is not set. To create a new secret, use the --create option.") - } }; let node_id = secret_key.public(); - let pkarr_relay = match (args.pkarr_relay, args.env) { - (Some(pkarr_relay), _) => pkarr_relay, + let pkarr_relay_url = match (args.pkarr_relay_url, args.env) { + (Some(url), _) => url, (None, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING.parse().expect("valid url"), (None, Env::Prod) => N0_DNS_PKARR_RELAY_PROD.parse().expect("valid url"), - (None, Env::Dev) => LOCALHOST_PKARR.parse().expect("valid url"), + (None, Env::Dev) => DEV_PKARR_RELAY_URL.parse().expect("valid url"), }; - println!("announce {node_id}:"); - if let Some(relay_url) = &args.relay_url { + let relay_url = if let Some(relay_url) = args.relay_url { + Some(relay_url) + } else if !args.no_relay_url { + Some(EXAMPLE_RELAY_URL.parse().expect("valid url")) + } else { + None + }; + + println!("announce node {node_id}:"); + if let Some(relay_url) = &relay_url { println!(" relay={relay_url}"); } for addr in &args.addr { @@ -90,11 +97,11 @@ async fn main() -> Result<()> { println!(" user-data={user_data}"); } println!(); - println!("publish to {pkarr_relay} ..."); + println!("publish to {pkarr_relay_url} ..."); - let pkarr = PkarrRelayClient::new(pkarr_relay); + let pkarr = PkarrRelayClient::new(pkarr_relay_url); let node_info = NodeInfo::new(node_id) - .with_relay_url(args.relay_url.map(Into::into)) + .with_relay_url(relay_url.map(Into::into)) .with_direct_addresses(args.addr.into_iter().collect()) .with_user_data(args.user_data); let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?; @@ -105,7 +112,10 @@ async fn main() -> Result<()> { match args.env { Env::Staging => { - println!(" cargo run --example resolve -- node {}", node_id); + println!( + " cargo run --example resolve -- --env staging node {}", + node_id + ); println!( " dig {} TXT", fmt_domain(&node_id, N0_DNS_NODE_ORIGIN_STAGING) @@ -128,7 +138,7 @@ async fn main() -> Result<()> { ); println!( " dig @localhost -p 5300 {} TXT", - fmt_domain(&node_id, EXAMPLE_ORIGIN) + fmt_domain(&node_id, DEV_DNS_ORIGIN_DOMAIN) ) } } diff --git a/iroh-dns-server/examples/resolve.rs b/iroh-dns-server/examples/resolve.rs index cf209b9d47f..11299405d85 100644 --- a/iroh-dns-server/examples/resolve.rs +++ b/iroh-dns-server/examples/resolve.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use clap::{Parser, ValueEnum}; use iroh::{ discovery::dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, @@ -5,8 +6,8 @@ use iroh::{ NodeId, }; -const LOCALHOST_DNS: &str = "127.0.0.1:5300"; -const EXAMPLE_ORIGIN: &str = "irohdns.example"; +const DEV_DNS_SERVER: &str = "127.0.0.1:5300"; +const DEV_DNS_ORIGIN_DOMAIN: &str = "irohdns.example"; #[derive(ValueEnum, Clone, Debug, Default)] pub enum Env { @@ -23,6 +24,7 @@ pub enum Env { struct Cli { #[clap(value_enum, short, long, default_value_t = Env::Staging)] env: Env, + dns_server: Option, #[clap(subcommand)] command: Command, } @@ -30,7 +32,13 @@ struct Cli { #[derive(Debug, Parser)] enum Command { /// Resolve node info by node id. - Node { node_id: NodeId }, + Node { + /// The node id to resolve. + node_id: NodeId, + /// Use a custom domain when resolving node info via DNS. + #[clap(long)] + dns_origin_domain: Option, + }, /// Resolve node info by domain. Domain { domain: String }, } @@ -38,16 +46,33 @@ enum Command { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Cli::parse(); - let (resolver, origin) = match args.env { - Env::Staging => (DnsResolver::new(), N0_DNS_NODE_ORIGIN_STAGING), - Env::Prod => (DnsResolver::new(), N0_DNS_NODE_ORIGIN_PROD), - Env::Dev => ( - DnsResolver::with_nameserver(LOCALHOST_DNS.parse()?), - EXAMPLE_ORIGIN, - ), + let resolver = if let Some(host) = args.dns_server { + let addr = tokio::net::lookup_host(host) + .await? + .next() + .context("failed to resolve DNS server address")?; + DnsResolver::with_nameserver(addr) + } else { + match args.env { + Env::Staging | Env::Prod => DnsResolver::new(), + Env::Dev => { + DnsResolver::with_nameserver(DEV_DNS_SERVER.parse().expect("valid address")) + } + } }; let resolved = match args.command { - Command::Node { node_id } => resolver.lookup_node_by_id(&node_id, origin).await?, + Command::Node { + node_id, + dns_origin_domain, + } => { + let origin_domain = match (&dns_origin_domain, args.env) { + (Some(domain), _) => domain, + (None, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD, + (None, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING, + (None, Env::Dev) => DEV_DNS_ORIGIN_DOMAIN, + }; + resolver.lookup_node_by_id(&node_id, origin_domain).await? + } Command::Domain { domain } => resolver.lookup_node_by_domain_name(&domain).await?, }; println!("resolved node {}", resolved.node_id); diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index b25acad8a21..fc9ba22f09d 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -45,10 +45,10 @@ struct Cli { #[derive(Clone, Copy, Default, Debug, clap::ValueEnum)] enum Env { - /// Use the production servers hosted by n0. - #[default] + /// Use the production servers hosted by number0. Prod, - /// Use the staging servers hosted by n0. + /// Use the staging servers hosted by number0. + #[default] Staging, } @@ -63,10 +63,10 @@ struct EndpointArgs { #[clap(long)] relay_url: Vec, /// Disable relays completely. - #[clap(long)] + #[clap(long, conflicts_with = "relay-url")] no_relay: bool, /// If set no direct connections will be established. - #[clap(long, default_value = "false")] + #[clap(long)] relay_only: bool, /// Use a custom pkarr server. #[clap(long)] From 66f2bfc7a7180ca487615c08245d8e5e1d2e1fc2 Mon Sep 17 00:00:00 2001 From: Frando Date: Sat, 10 May 2025 00:04:04 +0200 Subject: [PATCH 4/6] feat(transfer example): add dev env arg to use local dns and relay servers --- iroh/examples/transfer.rs | 81 ++++++++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index fc9ba22f09d..579d96821ae 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -12,7 +12,7 @@ use iroh::{ dns::DnsDiscovery, pkarr::{PkarrPublisher, N0_DNS_PKARR_RELAY_PROD, N0_DNS_PKARR_RELAY_STAGING}, }, - dns::{N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, + dns::{DnsResolver, N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, endpoint::ConnectionError, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey, }; @@ -20,9 +20,15 @@ use iroh_base::ticket::NodeTicket; use tokio_stream::StreamExt; use tracing::{info, warn}; use url::Url; + // Transfer ALPN that we are using to communicate over the `Endpoint` const TRANSFER_ALPN: &[u8] = b"n0/iroh/transfer/example/0"; +const DEV_RELAY_URL: &str = "http://localhost:3340"; +const DEV_PKARR_RELAY_URL: &str = "http://localhost:8080/pkarr"; +const DEV_DNS_ORIGIN_DOMAIN: &str = "irohdns.example"; +const DEV_DNS_SERVER: &str = "127.0.0.1:5300"; + /// Transfer data between iroh nodes. /// /// This is a useful example to test connection establishment and transfer speed. @@ -43,13 +49,20 @@ struct Cli { command: Commands, } -#[derive(Clone, Copy, Default, Debug, clap::ValueEnum)] +#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, clap::ValueEnum)] enum Env { /// Use the production servers hosted by number0. Prod, /// Use the staging servers hosted by number0. #[default] Staging, + /// Use localhost servers. + /// + /// To run the DNS server: + /// cargo run --bin iroh-dns-server + /// To run the relay server: + /// cargo run --bin iroh-relay --features server -- --dev + Dev, } #[derive(Debug, clap::Parser)] @@ -58,12 +71,12 @@ struct EndpointArgs { /// /// If other options are set, those will override the environment defaults. #[clap(short, long, value_enum, default_value_t)] - environment: Env, + env: Env, /// Set one or more relay servers to use. #[clap(long)] relay_url: Vec, /// Disable relays completely. - #[clap(long, conflicts_with = "relay-url")] + #[clap(long, conflicts_with = "relay_url")] no_relay: bool, /// If set no direct connections will be established. #[clap(long)] @@ -77,6 +90,9 @@ struct EndpointArgs { /// Use a custom domain when resolving node info via DNS. #[clap(long)] dns_origin_domain: Option, + /// Use a custom DNS server for resolving relay and node info domains. + #[clap(long)] + dns_server: Option, /// Do not resolve node info via DNS. #[clap(long)] no_dns_resolve: bool, @@ -141,50 +157,55 @@ impl EndpointArgs { } }; + let mut builder = Endpoint::builder(); + let relay_mode = if self.no_relay { RelayMode::Disabled } else { - match (self.relay_url.is_empty(), self.environment) { + match (self.relay_url.is_empty(), self.env) { (true, Env::Prod) => RelayMode::Default, (true, Env::Staging) => RelayMode::Staging, + (true, Env::Dev) => { + let url: RelayUrl = DEV_RELAY_URL.parse().expect("valid url"); + RelayMode::Custom(RelayMap::from(url)) + } (false, _) => { - let urls: Result, _> = self + let urls = self .relay_url .iter() .map(|u| RelayUrl::from_str(u)) - .collect(); - let urls = urls.context("failed to parse relay URL")?; + .collect::, _>>() + .context("failed to parse relay URL")?; RelayMode::Custom(RelayMap::from_iter(urls)) } } }; - let mut endpoint_builder = Endpoint::builder(); - if !self.no_pkarr_publish { - let url = match (&self.pkarr_relay_url, self.environment) { + let url = match (&self.pkarr_relay_url, self.env) { (None, Env::Prod) => N0_DNS_PKARR_RELAY_PROD, (None, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING, + (None, Env::Dev) => DEV_PKARR_RELAY_URL, (Some(url), _) => url, }; let url = Url::from_str(url).context("failed to parse pkarr relay url")?; - endpoint_builder = endpoint_builder + builder = builder .add_discovery(|secret_key| Some(PkarrPublisher::new(secret_key.clone(), url))); } if !self.no_dns_resolve { - let origin_domain = match (self.dns_origin_domain, self.environment) { + let origin_domain = match (self.dns_origin_domain, self.env) { (None, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD.to_string(), (None, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING.to_string(), + (None, Env::Dev) => DEV_DNS_ORIGIN_DOMAIN.to_string(), (Some(domain), _) => domain, }; - endpoint_builder = - endpoint_builder.add_discovery(|_| Some(DnsDiscovery::new(origin_domain))); + builder = builder.add_discovery(|_| Some(DnsDiscovery::new(origin_domain))); } #[cfg(feature = "discovery-local-network")] if self.mdns { - endpoint_builder = endpoint_builder.add_discovery(|secret_key| { + builder = builder.add_discovery(|secret_key| { Some( iroh::discovery::mdns::MdnsDiscovery::new(secret_key.public()) .expect("Failed to create mDNS discovery"), @@ -194,11 +215,27 @@ impl EndpointArgs { #[cfg(feature = "test-utils")] if self.relay_only { - endpoint_builder = - endpoint_builder.path_selection(iroh::endpoint::PathSelection::RelayOnly) + builder = builder.path_selection(iroh::endpoint::PathSelection::RelayOnly) + } + + let custom_dns_server = if let Some(host) = self.dns_server { + Some( + tokio::net::lookup_host(host) + .await + .context("failed to resolve DNS server address")? + .next() + .context("failed to resolve DNS server address")?, + ) + } else if self.env == Env::Dev { + Some(DEV_DNS_SERVER.parse().expect("valid addr")) + } else { + None + }; + if let Some(addr) = custom_dns_server { + builder = builder.dns_resolver(DnsResolver::with_nameserver(addr)); } - let endpoint = endpoint_builder + let endpoint = builder .secret_key(secret_key) .alpns(vec![TRANSFER_ALPN.to_vec()]) .relay_mode(relay_mode.clone()) @@ -285,7 +322,7 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { let conn_type_task = tokio::task::spawn(async move { let remote = node_id.fmt_short(); while let Some(conn_type) = conn_type_stream.next().await { - println!("[{remote}] Connection type changed to `{conn_type}`"); + println!("[{remote}] Connection type changed to: {conn_type}"); } }); @@ -313,7 +350,7 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { let duration = start.elapsed(); println!( - "[{remote}] Transferred {} in {:.4}, {}/s", + "[{remote}] Transferred {} in {:.4}s, {}/s", HumanBytes(size), duration.as_secs_f64(), HumanBytes((size as f64 / duration.as_secs_f64()) as u64) @@ -348,7 +385,7 @@ async fn fetch(endpoint: Endpoint, ticket: &str) -> anyhow::Result<()> { let mut conn_type_stream = endpoint.conn_type(remote).unwrap().stream(); let conn_type_task = tokio::task::spawn(async move { while let Some(conn_type) = conn_type_stream.next().await { - println!("Connection type changed to `{conn_type}`"); + println!("Connection type changed to: {conn_type}"); } }); From 1cb3dd9d3765f89679e6d3bafd6ccd0b47ff640c Mon Sep 17 00:00:00 2001 From: Frando Date: Sat, 10 May 2025 13:08:52 +0200 Subject: [PATCH 5/6] transfer example: cleanups --- iroh/examples/transfer.rs | 128 +++++++++++++++----------------------- 1 file changed, 50 insertions(+), 78 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 579d96821ae..649717a2311 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -85,7 +85,7 @@ struct EndpointArgs { #[clap(long)] pkarr_relay_url: Option, /// Disable publishing node info to pkarr. - #[clap(long)] + #[clap(long, conflicts_with = "pkarr_relay_url")] no_pkarr_publish: bool, /// Use a custom domain when resolving node info via DNS. #[clap(long)] @@ -98,7 +98,7 @@ struct EndpointArgs { no_dns_resolve: bool, #[cfg(feature = "discovery-local-network")] #[clap(long)] - /// Enable mDNS discovery, + /// Enable mDNS discovery. mdns: bool, } @@ -113,7 +113,6 @@ enum Commands { }, /// Fetch data. Fetch { - #[arg(index = 1)] ticket: String, #[clap(flatten)] endpoint_args: EndpointArgs, @@ -146,9 +145,11 @@ async fn main() -> anyhow::Result<()> { impl EndpointArgs { async fn into_endpoint(self) -> anyhow::Result { + let mut builder = Endpoint::builder(); + let secret_key = match std::env::var("IROH_SECRET") { Ok(s) => SecretKey::from_str(&s) - .context("failed to parse IROH_SECRET environment variable as iroh secret key")?, + .context("Failed to parse IROH_SECRET environment variable as iroh secret key")?, Err(_) => { let s = SecretKey::generate(rand::rngs::OsRng); println!("Generated a new node secret. To reuse, set"); @@ -156,49 +157,48 @@ impl EndpointArgs { s } }; - - let mut builder = Endpoint::builder(); + builder = builder.secret_key(secret_key); let relay_mode = if self.no_relay { RelayMode::Disabled + } else if !self.relay_url.is_empty() { + let urls = self + .relay_url + .iter() + .map(|u| RelayUrl::from_str(u)) + .collect::, _>>() + .context("Failed to parse relay URL")?; + RelayMode::Custom(RelayMap::from_iter(urls)) } else { - match (self.relay_url.is_empty(), self.env) { - (true, Env::Prod) => RelayMode::Default, - (true, Env::Staging) => RelayMode::Staging, - (true, Env::Dev) => { - let url: RelayUrl = DEV_RELAY_URL.parse().expect("valid url"); - RelayMode::Custom(RelayMap::from(url)) - } - (false, _) => { - let urls = self - .relay_url - .iter() - .map(|u| RelayUrl::from_str(u)) - .collect::, _>>() - .context("failed to parse relay URL")?; - RelayMode::Custom(RelayMap::from_iter(urls)) - } + match self.env { + Env::Prod => RelayMode::Default, + Env::Staging => RelayMode::Staging, + Env::Dev => RelayMode::Custom(RelayMap::from( + RelayUrl::from_str(DEV_RELAY_URL).expect("valid url"), + )), } }; + let relay_disabled = matches!(relay_mode, RelayMode::Disabled); + builder = builder.relay_mode(relay_mode); if !self.no_pkarr_publish { let url = match (&self.pkarr_relay_url, self.env) { - (None, Env::Prod) => N0_DNS_PKARR_RELAY_PROD, - (None, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING, - (None, Env::Dev) => DEV_PKARR_RELAY_URL, (Some(url), _) => url, + (_, Env::Prod) => N0_DNS_PKARR_RELAY_PROD, + (_, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING, + (_, Env::Dev) => DEV_PKARR_RELAY_URL, }; - let url = Url::from_str(url).context("failed to parse pkarr relay url")?; + let url = Url::from_str(url).context("Failed to parse pkarr relay URL")?; builder = builder .add_discovery(|secret_key| Some(PkarrPublisher::new(secret_key.clone(), url))); } if !self.no_dns_resolve { let origin_domain = match (self.dns_origin_domain, self.env) { - (None, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD.to_string(), - (None, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING.to_string(), - (None, Env::Dev) => DEV_DNS_ORIGIN_DOMAIN.to_string(), (Some(domain), _) => domain, + (_, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD.to_string(), + (_, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING.to_string(), + (_, Env::Dev) => DEV_DNS_ORIGIN_DOMAIN.to_string(), }; builder = builder.add_discovery(|_| Some(DnsDiscovery::new(origin_domain))); } @@ -218,81 +218,53 @@ impl EndpointArgs { builder = builder.path_selection(iroh::endpoint::PathSelection::RelayOnly) } - let custom_dns_server = if let Some(host) = self.dns_server { - Some( - tokio::net::lookup_host(host) - .await - .context("failed to resolve DNS server address")? - .next() - .context("failed to resolve DNS server address")?, - ) + if let Some(host) = self.dns_server { + let addr = tokio::net::lookup_host(host) + .await + .context("Failed to resolve DNS server address")? + .next() + .context("Failed to resolve DNS server address")?; + builder = builder.dns_resolver(DnsResolver::with_nameserver(addr)); } else if self.env == Env::Dev { - Some(DEV_DNS_SERVER.parse().expect("valid addr")) - } else { - None - }; - if let Some(addr) = custom_dns_server { + let addr = DEV_DNS_SERVER.parse().expect("valid addr"); builder = builder.dns_resolver(DnsResolver::with_nameserver(addr)); } - let endpoint = builder - .secret_key(secret_key) - .alpns(vec![TRANSFER_ALPN.to_vec()]) - .relay_mode(relay_mode.clone()) - .bind() - .await?; + let endpoint = builder.alpns(vec![TRANSFER_ALPN.to_vec()]).bind().await?; let node_id = endpoint.node_id(); println!("Our node id:\n\t{node_id}"); - println!("Our direct addresses:"); for local_endpoint in endpoint.direct_addresses().initialized().await? { println!("\t{} (type: {:?})", local_endpoint.addr, local_endpoint.typ) } - - if !matches!(relay_mode, RelayMode::Disabled) { + if !relay_disabled { let relay_url = endpoint .home_relay() .get()? - .expect("should be connected to a relay server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected relay server"); + .context("Failed to resolve our home relay")?; println!("Our home relay server:\n\t{relay_url}"); } println!(); - Ok(endpoint) } } async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { let node_id = endpoint.node_id(); - let relay_url = endpoint - .home_relay() - .get()? - .expect("should be connected to a relay server"); - let local_addrs = endpoint - .direct_addresses() - .initialized() - .await? - .into_iter() - .map(|endpoint| endpoint.addr) - .collect::>(); - - let node_addr = NodeAddr::from_parts(node_id, Some(relay_url.clone()), local_addrs); - let ticket = NodeTicket::new(node_addr); - println!( - "Ticket with our home relay and direct addresses:\n{}\n", - ticket - ); - let node_addr = NodeAddr::from_parts(node_id, Some(relay_url), vec![]); + + let node_addr = endpoint.node_addr().await?; let ticket = NodeTicket::new(node_addr); - println!( - "Ticket with our home relay but no direct addresses:\n{}\n", - ticket - ); - let node_addr = NodeAddr::from_parts(node_id, None, vec![]); + println!("Ticket with our home relay and direct addresses:\n{ticket}\n",); + + let mut node_addr = endpoint.node_addr().await?; + node_addr.direct_addresses = Default::default(); let ticket = NodeTicket::new(node_addr); - println!("Ticket with only our node id:\n{}\n", ticket); + println!("Ticket with our home relay but no direct addresses:\n{ticket}\n",); + + let ticket = NodeTicket::new(NodeAddr::new(node_id)); + println!("Ticket with only our node id:\n{ticket}\n"); // accept incoming connections, returns a normal QUIC connection while let Some(incoming) = endpoint.accept().await { From 2d7c52867f9824c1d5d3e92528d30ef0098b756f Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 12 May 2025 09:23:06 +0200 Subject: [PATCH 6/6] transfer example: further cleanups --- iroh/examples/transfer.rs | 141 ++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 65 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 649717a2311..a7960a51bdd 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -14,9 +14,10 @@ use iroh::{ }, dns::{DnsResolver, N0_DNS_NODE_ORIGIN_PROD, N0_DNS_NODE_ORIGIN_STAGING}, endpoint::ConnectionError, - Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey, + Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, RelayUrl, SecretKey, }; use iroh_base::ticket::NodeTicket; +use n0_future::task::AbortOnDropHandle; use tokio_stream::StreamExt; use tracing::{info, warn}; use url::Url; @@ -65,6 +66,35 @@ enum Env { Dev, } +impl Env { + fn relay_mode(self) -> RelayMode { + match self { + Env::Prod => RelayMode::Default, + Env::Staging => RelayMode::Staging, + Env::Dev => RelayMode::Custom(RelayMap::from( + RelayUrl::from_str(DEV_RELAY_URL).expect("valid url"), + )), + } + } + + fn pkarr_relay_url(self) -> Url { + match self { + Env::Prod => N0_DNS_PKARR_RELAY_PROD.parse(), + Env::Staging => N0_DNS_PKARR_RELAY_STAGING.parse(), + Env::Dev => DEV_PKARR_RELAY_URL.parse(), + } + .expect("valid url") + } + + fn dns_origin_domain(self) -> String { + match self { + Env::Prod => N0_DNS_NODE_ORIGIN_PROD.to_string(), + Env::Staging => N0_DNS_NODE_ORIGIN_STAGING.to_string(), + Env::Dev => DEV_DNS_ORIGIN_DOMAIN.to_string(), + } + } +} + #[derive(Debug, clap::Parser)] struct EndpointArgs { /// Set the environment for relay, pkarr, and DNS servers. @@ -74,7 +104,7 @@ struct EndpointArgs { env: Env, /// Set one or more relay servers to use. #[clap(long)] - relay_url: Vec, + relay_url: Vec, /// Disable relays completely. #[clap(long, conflicts_with = "relay_url")] no_relay: bool, @@ -83,7 +113,7 @@ struct EndpointArgs { relay_only: bool, /// Use a custom pkarr server. #[clap(long)] - pkarr_relay_url: Option, + pkarr_relay_url: Option, /// Disable publishing node info to pkarr. #[clap(long, conflicts_with = "pkarr_relay_url")] no_pkarr_publish: bool, @@ -120,7 +150,7 @@ enum Commands { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let cli = Cli::parse(); match cli.command { @@ -128,14 +158,14 @@ async fn main() -> anyhow::Result<()> { size, endpoint_args, } => { - let endpoint = endpoint_args.into_endpoint().await?; + let endpoint = endpoint_args.bind_endpoint().await?; provide(endpoint, size).await? } Commands::Fetch { ticket, endpoint_args, } => { - let endpoint = endpoint_args.into_endpoint().await?; + let endpoint = endpoint_args.bind_endpoint().await?; fetch(endpoint, &ticket).await? } } @@ -144,7 +174,7 @@ async fn main() -> anyhow::Result<()> { } impl EndpointArgs { - async fn into_endpoint(self) -> anyhow::Result { + async fn bind_endpoint(self) -> Result { let mut builder = Endpoint::builder(); let secret_key = match std::env::var("IROH_SECRET") { @@ -162,45 +192,25 @@ impl EndpointArgs { let relay_mode = if self.no_relay { RelayMode::Disabled } else if !self.relay_url.is_empty() { - let urls = self - .relay_url - .iter() - .map(|u| RelayUrl::from_str(u)) - .collect::, _>>() - .context("Failed to parse relay URL")?; - RelayMode::Custom(RelayMap::from_iter(urls)) + RelayMode::Custom(RelayMap::from_iter(self.relay_url)) } else { - match self.env { - Env::Prod => RelayMode::Default, - Env::Staging => RelayMode::Staging, - Env::Dev => RelayMode::Custom(RelayMap::from( - RelayUrl::from_str(DEV_RELAY_URL).expect("valid url"), - )), - } + self.env.relay_mode() }; - let relay_disabled = matches!(relay_mode, RelayMode::Disabled); builder = builder.relay_mode(relay_mode); if !self.no_pkarr_publish { - let url = match (&self.pkarr_relay_url, self.env) { - (Some(url), _) => url, - (_, Env::Prod) => N0_DNS_PKARR_RELAY_PROD, - (_, Env::Staging) => N0_DNS_PKARR_RELAY_STAGING, - (_, Env::Dev) => DEV_PKARR_RELAY_URL, - }; - let url = Url::from_str(url).context("Failed to parse pkarr relay URL")?; + let url = self + .pkarr_relay_url + .unwrap_or_else(|| self.env.pkarr_relay_url()); builder = builder .add_discovery(|secret_key| Some(PkarrPublisher::new(secret_key.clone(), url))); } if !self.no_dns_resolve { - let origin_domain = match (self.dns_origin_domain, self.env) { - (Some(domain), _) => domain, - (_, Env::Prod) => N0_DNS_NODE_ORIGIN_PROD.to_string(), - (_, Env::Staging) => N0_DNS_NODE_ORIGIN_STAGING.to_string(), - (_, Env::Dev) => DEV_DNS_ORIGIN_DOMAIN.to_string(), - }; - builder = builder.add_discovery(|_| Some(DnsDiscovery::new(origin_domain))); + let domain = self + .dns_origin_domain + .unwrap_or_else(|| self.env.dns_origin_domain()); + builder = builder.add_discovery(|_| Some(DnsDiscovery::new(domain))); } #[cfg(feature = "discovery-local-network")] @@ -238,7 +248,7 @@ impl EndpointArgs { for local_endpoint in endpoint.direct_addresses().initialized().await? { println!("\t{} (type: {:?})", local_endpoint.addr, local_endpoint.typ) } - if !relay_disabled { + if !self.no_relay { let relay_url = endpoint .home_relay() .get()? @@ -251,7 +261,7 @@ impl EndpointArgs { } } -async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { +async fn provide(endpoint: Endpoint, size: u64) -> Result<()> { let node_id = endpoint.node_id(); let node_addr = endpoint.node_addr().await?; @@ -277,26 +287,21 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { continue; } }; - let conn = connecting.await?; - let node_id = conn.remote_node_id()?; - info!( - "new connection from {node_id} with ALPN {}", - String::from_utf8_lossy(TRANSFER_ALPN), - ); - // spawn a task to handle reading and writing off of the connection let endpoint_clone = endpoint.clone(); tokio::spawn(async move { + let conn = connecting.await?; + let node_id = conn.remote_node_id()?; + info!( + "new connection from {node_id} with ALPN {}", + String::from_utf8_lossy(TRANSFER_ALPN), + ); + let remote = node_id.fmt_short(); println!("[{remote}] Connected"); - let mut conn_type_stream = endpoint_clone.conn_type(node_id).unwrap().stream(); - let conn_type_task = tokio::task::spawn(async move { - let remote = node_id.fmt_short(); - while let Some(conn_type) = conn_type_stream.next().await { - println!("[{remote}] Connection type changed to: {conn_type}"); - } - }); + // Spawn a background task that prints connection type changes. Will be aborted on drop. + let _guard = watch_conn_type(&endpoint_clone, node_id); // accept a bi-directional QUIC connection // use the `quinn` APIs to send and recv content @@ -332,7 +337,6 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { } else { println!("[{remote}] Disconnected"); } - conn_type_task.abort(); Ok::<_, anyhow::Error>(()) }); } @@ -341,25 +345,20 @@ async fn provide(endpoint: Endpoint, size: u64) -> anyhow::Result<()> { Ok(()) } -async fn fetch(endpoint: Endpoint, ticket: &str) -> anyhow::Result<()> { +async fn fetch(endpoint: Endpoint, ticket: &str) -> Result<()> { let me = endpoint.node_id().fmt_short(); let ticket: NodeTicket = ticket.parse()?; + let remote_node_id = ticket.node_addr().node_id; let start = Instant::now(); - let remote = ticket.node_addr().node_id; - // Attempt to connect, over the given ALPN. // Returns a Quinn connection. let conn = endpoint - .connect(ticket.node_addr().clone(), TRANSFER_ALPN) + .connect(NodeAddr::from(ticket), TRANSFER_ALPN) .await?; - println!("Connected to {remote}"); - let mut conn_type_stream = endpoint.conn_type(remote).unwrap().stream(); - let conn_type_task = tokio::task::spawn(async move { - while let Some(conn_type) = conn_type_stream.next().await { - println!("Connection type changed to: {conn_type}"); - } - }); + println!("Connected to {remote_node_id}"); + // Spawn a background task that prints connection type changes. Will be aborted on drop. + let _guard = watch_conn_type(&endpoint, remote_node_id); // Use the Quinn API to send and recv content. let (mut send, mut recv) = conn.open_bi().await?; @@ -385,7 +384,6 @@ async fn fetch(endpoint: Endpoint, ticket: &str) -> anyhow::Result<()> { time_to_first_byte.as_secs_f64(), chnk ); - conn_type_task.abort(); Ok(()) } @@ -474,3 +472,16 @@ fn parse_byte_size(s: &str) -> Result { let cfg = parse_size::Config::new().with_binary(); cfg.parse_size(s).map_err(|e| anyhow::anyhow!(e)) } + +fn watch_conn_type(endpoint: &Endpoint, node_id: NodeId) -> AbortOnDropHandle<()> { + let mut stream = endpoint.conn_type(node_id).unwrap().stream(); + let task = tokio::task::spawn(async move { + while let Some(conn_type) = stream.next().await { + println!( + "[{}] Connection type changed to: {conn_type}", + node_id.fmt_short() + ); + } + }); + AbortOnDropHandle::new(task) +}