From b37c1b45d7ffde8f96382a83069d5e431c80650c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 5 May 2025 14:45:59 +0300 Subject: [PATCH 1/4] Add example how one would implement expiring tags --- examples/expiring-tags.rs | 155 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 examples/expiring-tags.rs diff --git a/examples/expiring-tags.rs b/examples/expiring-tags.rs new file mode 100644 index 00000000..b736bcf6 --- /dev/null +++ b/examples/expiring-tags.rs @@ -0,0 +1,155 @@ +use std::time::{Duration, SystemTime}; + +use chrono::Utc; +use futures_lite::StreamExt; +use iroh::endpoint; +use iroh_blobs::store::GcConfig; +use iroh_blobs::{hashseq::HashSeq, BlobFormat, HashAndFormat}; +use iroh_blobs::Hash; + +use iroh_blobs::rpc::client::blobs::MemClient as BlobsClient; +use tokio::signal::ctrl_c; + +/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes. +/// +/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`). +/// +async fn create_expiring_tag( + iroh: &BlobsClient, + hashes: &[Hash], + prefix: &str, + expiry: SystemTime, +) -> anyhow::Result<()> { + let expiry = chrono::DateTime::::from(expiry); + let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let tagname = format!("{}-{}", prefix, expiry); + let batch = iroh.batch().await?; + let tt = if hashes.is_empty() { + return Ok(()); + } else if hashes.len() == 1 { + let hash = hashes[0]; + batch.temp_tag(HashAndFormat::raw(hash)).await? + } else { + let hs = hashes.into_iter().copied().collect::(); + batch + .add_bytes_with_opts(hs.into_inner(), BlobFormat::HashSeq) + .await? + }; + batch.persist_to(tt, tagname.as_str().into()).await?; + println!("Created tag {}", tagname); + Ok(()) +} + +async fn delete_expired_tags(iroh: &BlobsClient, prefix: &str) -> anyhow::Result<()> { + let mut tags = iroh.tags().list().await?; + let prefix = format!("{}-", prefix); + let now = chrono::Utc::now(); + let mut to_delete = Vec::new(); + while let Some(tag) = tags.next().await { + let tag = tag?.name; + if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) { + let Ok(expiry) = std::str::from_utf8(rest) else { + tracing::warn!("Tag {} does have non utf8 expiry", tag); + continue; + }; + let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else { + tracing::warn!("Tag {} does have invalid expiry date", tag); + continue; + }; + let expiry = expiry.with_timezone(&Utc); + if expiry < now { + to_delete.push(tag); + } + } + } + for tag in to_delete { + println!("Deleting expired tag {}", tag); + iroh.tags().delete(tag).await?; + } + Ok(()) +} + +async fn print_tags_task(blobs: BlobsClient) -> anyhow::Result<()> { + loop { + let now = chrono::Utc::now(); + let mut tags = blobs.tags().list().await?; + println!("Tags at {}:\n", now); + while let Some(tag) = tags.next().await { + let tag = tag?; + println!(" {:?}", tag); + } + println!(); + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +async fn print_blobs_task(blobs: BlobsClient) -> anyhow::Result<()> { + loop { + let now = chrono::Utc::now(); + let mut blobs = blobs.list().await?; + println!("Blobs at {}:\n", now); + while let Some(info) = blobs.next().await { + println!(" {:?}", info?); + } + println!(); + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +async fn delete_expired_tags_task(blobs: BlobsClient, prefix: &str, ) -> anyhow::Result<()> { + loop { + delete_expired_tags(&blobs, prefix).await?; + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let endpoint = endpoint::Endpoint::builder().bind().await?; + let store = iroh_blobs::store::fs::Store::load("blobs").await?; + let blobs = iroh_blobs::net_protocol::Blobs::builder(store) + .build(&endpoint); + // enable gc with a short period + blobs.start_gc(GcConfig { + period: Duration::from_secs(1), + done_callback: None, + })?; + // create a router and add blobs as a service + // + // You can skip this if you don't want to serve the data over the network. + let router = iroh::protocol::Router::builder(endpoint) + .accept(iroh_blobs::ALPN, blobs.clone()) + .spawn().await?; + + // setup: add some data and tag it + { + // add several blobs and tag them with an expiry date 10 seconds in the future + let batch = blobs.client().batch().await?; + let a = batch.add_bytes("blob 1".as_bytes()).await?; + let b = batch.add_bytes("blob 2".as_bytes()).await?; + let expires_at = SystemTime::now().checked_add(Duration::from_secs(10)).unwrap(); + create_expiring_tag(blobs.client(), &[*a.hash(), *b.hash()], "expiring", expires_at).await?; + + // add a single blob and tag it with an expiry date 60 seconds in the future + let c = batch.add_bytes("blob 3".as_bytes()).await?; + let expires_at = SystemTime::now().checked_add(Duration::from_secs(60)).unwrap(); + create_expiring_tag(blobs.client(), &[*c.hash()], "expiring", expires_at).await?; + // batch goes out of scope, so data is only protected by the tags we created + } + let client = blobs.client().clone(); + + // delete expired tags every 5 seconds + let check_task = tokio::spawn(delete_expired_tags_task(client.clone(), "expiring")); + // print tags every 5 seconds + let print_tags_task = tokio::spawn(print_tags_task(client.clone())); + // print blobs every 5 seconds + let print_blobs_task = tokio::spawn(print_blobs_task(client)); + + ctrl_c().await?; + router.shutdown().await?; + check_task.abort(); + print_tags_task.abort(); + print_blobs_task.abort(); + Ok(()) +} \ No newline at end of file From 0600dded42feb655157fc82e14d36afde483c95a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 5 May 2025 15:03:47 +0300 Subject: [PATCH 2/4] prettify the info output --- examples/expiring-tags.rs | 137 ++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 56 deletions(-) diff --git a/examples/expiring-tags.rs b/examples/expiring-tags.rs index b736bcf6..9feb3e78 100644 --- a/examples/expiring-tags.rs +++ b/examples/expiring-tags.rs @@ -1,19 +1,24 @@ +//! This example shows how to create tags that expire after a certain time. +//! +//! We use a prefix so we can distinguish between expiring and normal tags, and +//! then encode the expiry date in the tag name after the prefix, in a format +//! that sorts in the same order as the expiry date. +//! +//! Then we can just use use std::time::{Duration, SystemTime}; use chrono::Utc; use futures_lite::StreamExt; use iroh::endpoint; -use iroh_blobs::store::GcConfig; -use iroh_blobs::{hashseq::HashSeq, BlobFormat, HashAndFormat}; -use iroh_blobs::Hash; - -use iroh_blobs::rpc::client::blobs::MemClient as BlobsClient; +use iroh_blobs::{ + hashseq::HashSeq, rpc::client::blobs::MemClient as BlobsClient, store::GcConfig, BlobFormat, + Hash, HashAndFormat, Tag, +}; use tokio::signal::ctrl_c; /// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes. /// /// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`). -/// async fn create_expiring_tag( iroh: &BlobsClient, hashes: &[Hash], @@ -40,65 +45,78 @@ async fn create_expiring_tag( Ok(()) } -async fn delete_expired_tags(iroh: &BlobsClient, prefix: &str) -> anyhow::Result<()> { - let mut tags = iroh.tags().list().await?; +async fn delete_expired_tags(blobs: &BlobsClient, prefix: &str, bulk: bool) -> anyhow::Result<()> { + let mut tags = blobs.tags().list().await?; let prefix = format!("{}-", prefix); let now = chrono::Utc::now(); - let mut to_delete = Vec::new(); - while let Some(tag) = tags.next().await { - let tag = tag?.name; - if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) { - let Ok(expiry) = std::str::from_utf8(rest) else { - tracing::warn!("Tag {} does have non utf8 expiry", tag); - continue; - }; - let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else { - tracing::warn!("Tag {} does have invalid expiry date", tag); - continue; - }; - let expiry = expiry.with_timezone(&Utc); - if expiry < now { - to_delete.push(tag); + let end = format!( + "{}-{}", + prefix, + now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + ); + if bulk { + // delete all tags with the prefix and an expiry date before now + // + // this should be very efficient, since it is just a single database operation + blobs + .tags() + .delete_range(Tag::from(prefix.clone())..Tag::from(end)) + .await?; + } else { + // find tags to delete one by one and then delete them + // + // this allows us to print the tags before deleting them + let mut to_delete = Vec::new(); + while let Some(tag) = tags.next().await { + let tag = tag?.name; + if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) { + let Ok(expiry) = std::str::from_utf8(rest) else { + tracing::warn!("Tag {} does have non utf8 expiry", tag); + continue; + }; + let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else { + tracing::warn!("Tag {} does have invalid expiry date", tag); + continue; + }; + let expiry = expiry.with_timezone(&Utc); + if expiry < now { + to_delete.push(tag); + } } } - } - for tag in to_delete { - println!("Deleting expired tag {}", tag); - iroh.tags().delete(tag).await?; + for tag in to_delete { + println!("Deleting expired tag {}", tag); + blobs.tags().delete(tag).await?; + } } Ok(()) } -async fn print_tags_task(blobs: BlobsClient) -> anyhow::Result<()> { +async fn info_task(blobs: BlobsClient) -> anyhow::Result<()> { + tokio::time::sleep(Duration::from_secs(1)).await; loop { let now = chrono::Utc::now(); let mut tags = blobs.tags().list().await?; - println!("Tags at {}:\n", now); + println!("Current time: {}", now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)); + println!("Tags:"); while let Some(tag) = tags.next().await { let tag = tag?; println!(" {:?}", tag); } - println!(); - tokio::time::sleep(Duration::from_secs(5)).await; - } -} - -async fn print_blobs_task(blobs: BlobsClient) -> anyhow::Result<()> { - loop { - let now = chrono::Utc::now(); let mut blobs = blobs.list().await?; - println!("Blobs at {}:\n", now); + println!("Blobs:"); while let Some(info) = blobs.next().await { - println!(" {:?}", info?); + let info = info?; + println!(" {} {} bytes", info.hash, info.size); } println!(); tokio::time::sleep(Duration::from_secs(5)).await; } } -async fn delete_expired_tags_task(blobs: BlobsClient, prefix: &str, ) -> anyhow::Result<()> { +async fn delete_expired_tags_task(blobs: BlobsClient, prefix: &str) -> anyhow::Result<()> { loop { - delete_expired_tags(&blobs, prefix).await?; + delete_expired_tags(&blobs, prefix, false).await?; tokio::time::sleep(Duration::from_secs(5)).await; } } @@ -108,8 +126,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let endpoint = endpoint::Endpoint::builder().bind().await?; let store = iroh_blobs::store::fs::Store::load("blobs").await?; - let blobs = iroh_blobs::net_protocol::Blobs::builder(store) - .build(&endpoint); + let blobs = iroh_blobs::net_protocol::Blobs::builder(store).build(&endpoint); // enable gc with a short period blobs.start_gc(GcConfig { period: Duration::from_secs(1), @@ -120,7 +137,8 @@ async fn main() -> anyhow::Result<()> { // You can skip this if you don't want to serve the data over the network. let router = iroh::protocol::Router::builder(endpoint) .accept(iroh_blobs::ALPN, blobs.clone()) - .spawn().await?; + .spawn() + .await?; // setup: add some data and tag it { @@ -128,28 +146,35 @@ async fn main() -> anyhow::Result<()> { let batch = blobs.client().batch().await?; let a = batch.add_bytes("blob 1".as_bytes()).await?; let b = batch.add_bytes("blob 2".as_bytes()).await?; - let expires_at = SystemTime::now().checked_add(Duration::from_secs(10)).unwrap(); - create_expiring_tag(blobs.client(), &[*a.hash(), *b.hash()], "expiring", expires_at).await?; + let expires_at = SystemTime::now() + .checked_add(Duration::from_secs(10)) + .unwrap(); + create_expiring_tag( + blobs.client(), + &[*a.hash(), *b.hash()], + "expiring", + expires_at, + ) + .await?; // add a single blob and tag it with an expiry date 60 seconds in the future let c = batch.add_bytes("blob 3".as_bytes()).await?; - let expires_at = SystemTime::now().checked_add(Duration::from_secs(60)).unwrap(); + let expires_at = SystemTime::now() + .checked_add(Duration::from_secs(60)) + .unwrap(); create_expiring_tag(blobs.client(), &[*c.hash()], "expiring", expires_at).await?; // batch goes out of scope, so data is only protected by the tags we created } let client = blobs.client().clone(); // delete expired tags every 5 seconds - let check_task = tokio::spawn(delete_expired_tags_task(client.clone(), "expiring")); - // print tags every 5 seconds - let print_tags_task = tokio::spawn(print_tags_task(client.clone())); - // print blobs every 5 seconds - let print_blobs_task = tokio::spawn(print_blobs_task(client)); + let delete_task = tokio::spawn(delete_expired_tags_task(client.clone(), "expiring")); + // print all tags and blobs every 5 seconds + let info_task = tokio::spawn(info_task(client.clone())); ctrl_c().await?; + delete_task.abort(); + info_task.abort(); router.shutdown().await?; - check_task.abort(); - print_tags_task.abort(); - print_blobs_task.abort(); Ok(()) -} \ No newline at end of file +} From ed89a3045f30faa0f69ebd487d7fa66b309b5c94 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 5 May 2025 15:11:43 +0300 Subject: [PATCH 3/4] fmt --- examples/expiring-tags.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/expiring-tags.rs b/examples/expiring-tags.rs index 9feb3e78..88d17b1f 100644 --- a/examples/expiring-tags.rs +++ b/examples/expiring-tags.rs @@ -97,7 +97,10 @@ async fn info_task(blobs: BlobsClient) -> anyhow::Result<()> { loop { let now = chrono::Utc::now(); let mut tags = blobs.tags().list().await?; - println!("Current time: {}", now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)); + println!( + "Current time: {}", + now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + ); println!("Tags:"); while let Some(tag) = tags.next().await { let tag = tag?; From b6380fbf912914fbf5a59bc1793bd46cf97d4202 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 5 May 2025 15:12:09 +0300 Subject: [PATCH 4/4] clippy --- examples/expiring-tags.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/expiring-tags.rs b/examples/expiring-tags.rs index 88d17b1f..bda3b6bc 100644 --- a/examples/expiring-tags.rs +++ b/examples/expiring-tags.rs @@ -35,7 +35,7 @@ async fn create_expiring_tag( let hash = hashes[0]; batch.temp_tag(HashAndFormat::raw(hash)).await? } else { - let hs = hashes.into_iter().copied().collect::(); + let hs = hashes.iter().copied().collect::(); batch .add_bytes_with_opts(hs.into_inner(), BlobFormat::HashSeq) .await?