Skip to content

Commit f75b275

Browse files
committed
WIP add fn open
1 parent 78c06ec commit f75b275

File tree

7 files changed

+90
-15
lines changed

7 files changed

+90
-15
lines changed

Cargo.lock

+4-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ async-channel = "2.3.1"
1818
bao-tree = { version = "0.13", features = [
1919
"tokio_fsm",
2020
"validate",
21+
"experimental-mixed",
2122
], default-features = false }
2223
blake3 = { version = "1.4.5", package = "iroh-blake3" }
2324
bytes = { version = "1.7", features = ["serde"] }
@@ -77,6 +78,7 @@ walkdir = { version = "2.5.0", optional = true }
7778
# Examples
7879
console = { version = "0.15.8", optional = true }
7980
tracing-test = "0.2.5"
81+
positioned-io = "0.3.3"
8082

8183
[dev-dependencies]
8284
http-body = "1.0"
@@ -189,3 +191,4 @@ incremental = false
189191
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
190192
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
191193
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }
194+
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "read_and_seek" }

src/rpc.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ use proto::{
2323
AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, AddStreamUpdate,
2424
BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, BatchAddStreamResponse,
2525
BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest,
26-
BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest,
27-
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadResponse,
28-
ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest,
29-
ReadAtResponse, ValidateRequest,
26+
BatchUpdate, BlobEntryInfoRequest, BlobStatusRequest, BlobStatusResponse,
27+
ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteRequest,
28+
DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest,
29+
ReadAtRequest, ReadAtResponse, ValidateRequest,
3030
},
3131
tags::{
3232
CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest,
@@ -51,7 +51,9 @@ use crate::{
5151
},
5252
net_protocol::{BlobDownloadRequest, Blobs, BlobsInner},
5353
provider::{AddProgress, BatchAddPathProgress},
54-
store::{ConsistencyCheckProgress, ImportProgress, MapEntry, ValidateProgress},
54+
store::{
55+
ConsistencyCheckProgress, EntryPathOrData, ImportProgress, MapEntry, ValidateProgress,
56+
},
5557
util::{
5658
local_pool::LocalPoolHandle,
5759
progress::{AsyncChannelProgressSender, ProgressSender},
@@ -203,6 +205,7 @@ impl<D: crate::store::Store> Handler<D> {
203205
.await
204206
}
205207
BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await,
208+
EntryInfo(msg) => chan.rpc(msg, self, Self::blob_entry_info).await,
206209
}
207210
}
208211

@@ -309,6 +312,17 @@ impl<D: crate::store::Store> Handler<D> {
309312
Ok(())
310313
}
311314

315+
async fn blob_entry_info(
316+
self,
317+
msg: BlobEntryInfoRequest,
318+
) -> RpcResult<Option<EntryPathOrData>> {
319+
Ok(self
320+
.store()
321+
.entry_path_or_data(msg.hash)
322+
.await
323+
.map_err(|e| RpcError::new(&e))?)
324+
}
325+
312326
fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
313327
tracing::info!("blob_list_tags");
314328
let blobs = self;

src/rpc/client/blobs.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ use std::{
6666
};
6767

6868
use anyhow::{anyhow, Context as _, Result};
69+
use bao_tree::{
70+
io::{baofile::BaoFile, outboard::PreOrderOutboard},
71+
BaoTree,
72+
};
6973
use bytes::Bytes;
7074
use futures_lite::{Stream, StreamExt};
7175
use futures_util::SinkExt;
@@ -87,10 +91,10 @@ use crate::{
8791
format::collection::{Collection, SimpleStore},
8892
get::db::DownloadProgress as BytesDownloadProgress,
8993
net_protocol::BlobDownloadRequest,
90-
rpc::proto::RpcService,
94+
rpc::proto::{blobs::BlobEntryInfoRequest, RpcService},
9195
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
92-
util::SetTagOption,
93-
BlobFormat, Hash, Tag,
96+
util::{MemOrFile, SetTagOption},
97+
BlobFormat, Hash, Tag, IROH_BLOCK_SIZE,
9498
};
9599

96100
mod batch;
@@ -380,6 +384,31 @@ where
380384
))
381385
}
382386

387+
/// Open a blob as an independent bao file
388+
pub async fn open(&self, hash: Hash) -> Result<impl std::io::Read + std::io::Seek> {
389+
let Some(info) = self.rpc.rpc(BlobEntryInfoRequest { hash }).await?? else {
390+
return Err(anyhow!("Blob not found"));
391+
};
392+
let (data, size) = match info.data {
393+
MemOrFile::Mem(data) => (MemOrFile::Mem(data.clone()), data.len() as u64),
394+
MemOrFile::File((path, size)) => (MemOrFile::File(std::fs::File::open(path)?), size),
395+
};
396+
let outboard = match info.outboard {
397+
MemOrFile::Mem(data) => MemOrFile::Mem(data.clone()),
398+
MemOrFile::File(path) => MemOrFile::File(std::fs::File::open(path)?),
399+
};
400+
let file = BaoFile {
401+
data,
402+
outboard: PreOrderOutboard {
403+
tree: BaoTree::new(size, IROH_BLOCK_SIZE),
404+
root: hash.into(),
405+
data: outboard,
406+
},
407+
};
408+
let file = positioned_io::Cursor::new(file);
409+
Ok(file)
410+
}
411+
383412
/// Export a blob from the internal blob store to a path on the node's filesystem.
384413
///
385414
/// `destination` should be an writeable, absolute path on the local node's filesystem.

src/rpc/proto/blobs.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::{
1515
provider::{AddProgress, BatchAddPathProgress},
1616
rpc::client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption},
1717
store::{
18-
BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode,
19-
ValidateProgress,
18+
BaoBlobSize, ConsistencyCheckProgress, EntryPathOrData, ExportFormat, ExportMode,
19+
ImportMode, ValidateProgress,
2020
},
2121
util::SetTagOption,
2222
BlobFormat, Hash, HashAndFormat, Tag,
@@ -63,6 +63,8 @@ pub enum Request {
6363
BatchAddPath(BatchAddPathRequest),
6464
#[rpc(response = RpcResult<()>)]
6565
BatchCreateTempTag(BatchCreateTempTagRequest),
66+
#[rpc(response = RpcResult<Option<EntryPathOrData>>)]
67+
EntryInfo(BlobEntryInfoRequest),
6668
}
6769

6870
#[allow(missing_docs)]
@@ -83,6 +85,7 @@ pub enum Response {
8385
BatchCreate(BatchCreateResponse),
8486
BatchAddStream(BatchAddStreamResponse),
8587
BatchAddPath(BatchAddPathResponse),
88+
EntryInfo(RpcResult<Option<EntryPathOrData>>),
8689
}
8790

8891
/// A request to the node to provide the data at the given path
@@ -313,6 +316,13 @@ pub struct BatchAddPathRequest {
313316
pub batch: BatchId,
314317
}
315318

319+
/// Write a blob from a byte stream
320+
#[derive(Serialize, Deserialize, Debug)]
321+
pub struct BlobEntryInfoRequest {
322+
/// The hash of the blob
323+
pub hash: Hash,
324+
}
325+
316326
/// Response to a batch add path request
317327
#[derive(Serialize, Deserialize, Debug)]
318328
pub struct BatchAddPathResponse(pub BatchAddPathProgress);

src/store/traits.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub enum EntryStatus {
4343
}
4444

4545
/// Get the path or data for an entry
46-
#[derive(Debug)]
46+
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
4747
pub struct EntryPathOrData {
4848
/// The path to the data file or the inline data
4949
pub data: MemOrFile<Bytes, (PathBuf, u64)>,

src/util/mem_or_file.rs

+19-2
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
11
use std::{fs::File, io};
22

3-
use bao_tree::io::sync::{ReadAt, Size};
3+
use bao_tree::io::{
4+
mixed::ReadBytesAt,
5+
sync::{ReadAt, Size},
6+
};
47
use bytes::Bytes;
8+
use serde::{Deserialize, Serialize};
59

610
/// This is a general purpose Either, just like Result, except that the two cases
711
/// are Mem for something that is in memory, and File for something that is somewhere
812
/// external and only available via io.
9-
#[derive(Debug)]
13+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1014
pub enum MemOrFile<M, F> {
1115
/// We got it all in memory
1216
Mem(M),
1317
/// A file
1418
File(F),
1519
}
1620

21+
impl<M, F> ReadBytesAt for MemOrFile<M, F>
22+
where
23+
M: ReadBytesAt,
24+
F: ReadBytesAt,
25+
{
26+
fn read_bytes_at(&self, offset: u64, size: usize) -> io::Result<Bytes> {
27+
match self {
28+
MemOrFile::Mem(mem) => mem.read_bytes_at(offset, size),
29+
MemOrFile::File(file) => file.read_bytes_at(offset, size),
30+
}
31+
}
32+
}
33+
1734
/// Helper methods for a common way to use MemOrFile, where the memory part is something
1835
/// like a slice, and the file part is a tuple consisiting of path or file and size.
1936
impl<M, F> MemOrFile<M, (F, u64)>

0 commit comments

Comments
 (0)