Skip to content
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_jcs = "0.1"
serde_json = "1"
sha2 = "0.10"
tabled = "0.15"
tempfile = "3"
time = { version = "0.3.21", features = ["serde-well-known"] }
tokio = { version = "1", features = ["full"] }
toml = "0.8"
Expand Down
4 changes: 2 additions & 2 deletions core/src/crp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cid::Cid;
use futures::Stream;
use serde::{Deserialize, Serialize};

use crate::{cid_filter::CidFilter, routes::Route, Context};
use crate::{cid_filter::CidFilter, routes::Route, Context, Url};

/// Set of all supported CID Route Providers (CRPs) throughout the system
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -113,5 +113,5 @@ pub trait BlobWriter: Send + Sync {
auth: Option<bytes::Bytes>,
cid: &Cid,
data: &[u8],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
) -> Result<Url, Box<dyn std::error::Error + Send + Sync>>;
}
7 changes: 3 additions & 4 deletions core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ impl Db {
creator BLOB,
signature BLOB,
multicodec TEXT,
UNIQUE(provider_id, provider_type, cid),
UNIQUE(provider_id, provider_type, url)
)",
[],
Expand All @@ -104,7 +103,7 @@ impl Db {
let conn = self.conn.lock().await;

let mut stmt = conn.prepare(
"INSERT INTO routes (id, created_at, verified_at, provider_id, provider_type, url, cid, size, multicodec, creator, signature)
"INSERT OR IGNORE INTO routes (id, created_at, verified_at, provider_id, provider_type, url, cid, size, multicodec, creator, signature)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"
)?;

Expand All @@ -116,7 +115,7 @@ impl Db {
route.id.to_string(),
created,
verified_at,
route.provider_type.to_string(),
route.provider_id.to_string(),
route.provider_type.to_string(),
route.url,
route.cid.to_bytes(),
Expand Down Expand Up @@ -296,7 +295,7 @@ impl Db {
pub async fn insert_stub(&self, stub: &RouteStub) -> Result<()> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"INSERT INTO routes (id, created_at, verified_at, provider_id, provider_type, url, cid, size, multicodec, creator, signature)
"INSERT OR IGNORE INTO routes (id, created_at, verified_at, provider_id, provider_type, url, cid, size, multicodec, creator, signature)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
)?;

Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ pub mod repo;
pub mod routes;

pub use context::Context;
// Re-export Url from reqwest
pub use reqwest::Url;
2 changes: 2 additions & 0 deletions crps/azure/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub struct ContainerConfig {
pub container: String,
pub credentials: Option<Credentials>,
pub filter: ContainerBlobFilter,
#[serde(default)]
pub writeable: bool,
}

#[derive(Clone, Serialize, Deserialize)]
Expand Down
103 changes: 97 additions & 6 deletions crps/azure/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use azure_storage_blobs::{blob::Blob, prelude::*};
use bytes::Bytes;
use cid::Cid;
use cid_router_core::{
Context,
Context, Url,
cid::{Codec, blake3_hash_to_cid},
cid_filter::CidFilter,
crp::{Crp, CrpCapabilities, ProviderType, RouteResolver},
crp::{BlobWriter, Crp, CrpCapabilities, ProviderType, RouteResolver},
db::{Direction, OrderBy},
routes::{Route, RouteStub},
};
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Crp for Container {
fn capabilities<'a>(&'a self) -> CrpCapabilities<'a> {
CrpCapabilities {
route_resolver: Some(self),
blob_writer: None, // TODO
blob_writer: if self.cfg.writeable { Some(self) } else { None },
}
}

Expand All @@ -57,6 +57,30 @@ impl Crp for Container {
}
}

#[async_trait]
impl BlobWriter for Container {
async fn put_blob(
&self,
_auth: Option<bytes::Bytes>,
cid: &Cid,
data: &[u8],
) -> Result<Url, Box<dyn std::error::Error + Send + Sync>> {
if !self.cfg.writeable {
return Err("Container is not writeable".into());
}
info!("Uploading blob for cid {}...", cid);
let name = cid.to_string();
let blob_client = self.client.blob_client(&name);
blob_client
.put_block_blob(data.to_vec())
.content_type("application/octet-stream")
.await?;
let url = self.name_to_route_url(&name);
info!("Upload successful! Blob URL: {}", url);
Ok(Url::parse(&url).unwrap())
}
}

#[async_trait]
impl RouteResolver for Container {
async fn get_bytes(
Expand Down Expand Up @@ -121,7 +145,13 @@ impl Container {
));
StorageCredentials::token_credential(credential)
}
None => StorageCredentials::anonymous(),
None => {
if let Ok(key) = std::env::var("AZURE_STORAGE_ACCESS_KEY") {
StorageCredentials::access_key(account.as_str(), key)
} else {
StorageCredentials::anonymous()
}
}
};
let client = BlobServiceClient::new(account, credentials);
let client = client.container_client(container);
Expand Down Expand Up @@ -165,13 +195,17 @@ impl Container {
Ok(())
}

fn blob_to_route_url(&self, blob: &Blob) -> String {
fn name_to_route_url(&self, name: &str) -> String {
format!(
"https://{}.blob.core.windows.net/{}/{}",
self.cfg.account, self.cfg.container, blob.name
self.cfg.account, self.cfg.container, name
)
}

fn blob_to_route_url(&self, blob: &Blob) -> String {
self.name_to_route_url(&blob.name)
}

fn route_url_to_name(url: &str) -> Result<String> {
// Split by '/' and take everything after the container (4th segment onwards)
let parts: Vec<&str> = url.split('/').collect();
Expand Down Expand Up @@ -243,3 +277,60 @@ impl Container {
Ok(cid)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use cid::Cid;

#[tokio::test]
#[ignore]
async fn create_data() -> anyhow::Result<()> {
let account = std::env::var("AZURE_STORAGE_ACCOUNT")
.expect("Set AZURE_STORAGE_ACCOUNT env var for this test");
let access_key = std::env::var("AZURE_STORAGE_ACCESS_KEY")
.expect("Set AZURE_STORAGE_ACCESS_KEY env var for this test");

let credentials = StorageCredentials::access_key(&account, access_key);
let service_client = BlobServiceClient::new(&account, credentials);
let client = Arc::new(service_client);

let container_name = "blobs";
let container_client = client.container_client(container_name);

// Create container (idempotent)
let _ = container_client.create().await.or_else(|e| {
match e.kind() {
azure_core::error::ErrorKind::HttpResponse { status, .. } if *status == 409 => {
// Container already exists
Ok(())
}
_ => Err(e),
}
})?;

// Some test data
let data: &[u8] = b"Hello from a quick Rust test! This is blob content.";
let cid_str = "bafkreigh2akiscaildcqabs2mfomphc4i7w3oecxi4n3go3ch3gaov2mqa"; // any valid CID
let cid = Cid::try_from(cid_str).unwrap();

let blob_name = cid.to_string();
let blob_client = container_client.blob_client(&blob_name);

println!("Uploading blob '{}' ({} bytes)...", blob_name, data.len());
blob_client
.put_block_blob(data)
.content_type("text/plain; charset=utf-8")
.await?;

println!("Upload successful!");
println!(
"Blob URL: https://{}.blob.core.windows.net/{}/{}",
account, container_name, blob_name
);
Ok(())
}
}
6 changes: 3 additions & 3 deletions crps/iroh/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cid_router_core::{
cid_filter::{CidFilter, CodeFilter},
crp::{BlobWriter, Crp, CrpCapabilities, ProviderType, RouteResolver},
routes::Route,
Context,
Context, Url,
};
use futures::Stream;
use iroh_blobs::Hash;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl BlobWriter for IrohCrp {
_auth: Option<Bytes>,
cid: &Cid,
data: &[u8],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
) -> Result<Url, Box<dyn std::error::Error + Send + Sync>> {
if !self.writeable {
// this should not happen because we don't hand out the BlobWriter
//capability if not writable.
Expand All @@ -93,7 +93,7 @@ impl BlobWriter for IrohCrp {
return Err("Unsupported CID hash code; only blake3 is supported".into());
}
blobs.add_bytes(data).with_tag().await.map_err(Box::new)?;
Ok(())
Ok(Url::parse(&cid.to_string()).unwrap())
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ tower-http.workspace = true
utoipa.workspace = true
utoipa-axum.workspace = true
utoipa-swagger-ui.workspace = true

[dev-dependencies]
env_logger.workspace = true
log.workspace = true
rand.workspace = true
tempfile.workspace = true
9 changes: 9 additions & 0 deletions server/config.azure.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port = 3080
auth = "none"

[[providers]]
type = "azure"
account = "eueast"
container = "blobs"
filter = "all"
writeable = true
File renamed without changes.
4 changes: 2 additions & 2 deletions server/src/api/v1/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ pub async fn create_data(
}

for (provider, res) in &outcome {
if res.is_ok() {
if let Ok(url) = res {
let route = cid_router_core::routes::Route::builder(*provider)
.cid(cid)
.multicodec(cid_router_core::cid::Codec::Raw)
.size(data.len() as u64)
.url(cid.to_string())
.url(url.to_string())
.build(&ctx.core)?;
ctx.core.db().insert_route(&route).await?;
}
Expand Down
12 changes: 11 additions & 1 deletion server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ pub struct Context {

impl Context {
pub async fn init_from_repo(repo: Repo, config: Config) -> Result<Self> {
let core = cid_router_core::context::Context::from_repo(repo).await?;
Self::init_inner(core, config).await
}

/// Initialize context with an in-memory database (for testing).
pub async fn init_in_memory(config: Config) -> Result<Self> {
let core = cid_router_core::context::Context::mem().await?;
Self::init_inner(core, config).await
}

async fn init_inner(core: cid_router_core::context::Context, config: Config) -> Result<Self> {
let start_time = chrono::Utc::now().timestamp();
let port = config.port;

let auth = config.auth.clone();
let core = cid_router_core::context::Context::from_repo(repo).await?;

let providers = future::join_all(config.providers.into_iter().map(
|provider_config| async move {
Expand Down
Loading