diff --git a/Cargo.lock b/Cargo.lock index 526486e43..c387222a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2086,7 +2086,7 @@ dependencies = [ [[package]] name = "iroh-blobs" -version = "0.34.1" +version = "0.35.1" dependencies = [ "anyhow", "async-channel", diff --git a/Cargo.toml b/Cargo.toml index 345f56d13..c1e48c2e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-blobs" -version = "0.34.1" +version = "0.35.1" edition = "2021" readme = "README.md" description = "blob and collection transfer support for iroh" diff --git a/src/lib.rs b/src/lib.rs index fc9e397bb..3c4db795f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] #![recursion_limit = "256"] #![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] +#![feature(async_closure)] #[cfg(feature = "cli")] pub mod cli; diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 9fc8ba7ee..fa4a72113 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -7,6 +7,7 @@ use std::{ collections::BTreeSet, fmt::Debug, ops::{Deref, DerefMut}, + path::PathBuf, sync::Arc, }; @@ -20,7 +21,7 @@ use tracing::debug; use crate::{ downloader::{ConcurrencyLimits, Downloader, RetryConfig}, provider::EventSender, - store::GcConfig, + store::{fs::Persistence, GcConfig}, util::{ local_pool::{self, LocalPool, LocalPoolHandle}, SetTagOption, @@ -221,12 +222,19 @@ impl Blobs { } } -impl Blobs { +impl Blobs> +where + T: Persistence, +{ /// Load a persistent Blobs protocol handler from a path. pub async fn persistent( path: impl AsRef, - ) -> anyhow::Result> { - Ok(Self::builder(crate::store::fs::Store::load(path).await?)) + db_path: PathBuf, + backend: T, + ) -> anyhow::Result>> { + Ok(Self::builder( + crate::store::fs::Store::load_with_backend(path, db_path, backend).await?, + )) } } diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index e5f5acee6..b213fefcf 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -1028,6 +1028,7 @@ mod tests { net_protocol::Blobs, provider::{CustomEventSender, EventSender}, rpc::client::{blobs, tags}, + store::fs::FileSystemPersistence, }; type RpcClient = quic_rpc::RpcClient; @@ -1113,7 +1114,8 @@ mod tests { /// Creates a new node with persistent storage pub async fn persistent( path: impl AsRef, - ) -> anyhow::Result> { + ) -> anyhow::Result>> + { Ok(Builder { store: crate::store::fs::Store::load(path).await?, events: Default::default(), diff --git a/src/store/bao_file.rs b/src/store/bao_file.rs index c06328669..cbe388687 100644 --- a/src/store/bao_file.rs +++ b/src/store/bao_file.rs @@ -14,7 +14,7 @@ use std::{ io, ops::{Deref, DerefMut}, path::{Path, PathBuf}, - sync::{Arc, RwLock, Weak}, + sync::{Arc, Weak}, }; use bao_tree::{ @@ -32,7 +32,7 @@ use iroh_io::AsyncSliceReader; use super::mutable_mem_storage::{MutableMemStorage, SizeInfo}; use crate::{ store::BaoBatchWriter, - util::{get_limited_slice, MemOrFile, SparseMemFile}, + util::{callback_lock::CallbackLock, get_limited_slice, FileAndSize, MemOrFile, SparseMemFile}, Hash, IROH_BLOCK_SIZE, }; @@ -81,22 +81,36 @@ struct DataPaths { /// /// For the memory variant, it does reading in a zero copy way, since storage /// is already a `Bytes`. -#[derive(Default, derive_more::Debug)] -pub struct CompleteStorage { +#[derive(derive_more::Debug)] +pub struct CompleteStorage { /// data part, which can be in memory or on disk. - #[debug("{:?}", data.as_ref().map_mem(|x| x.len()))] - pub data: MemOrFile, + #[debug("{:?}", data.as_ref().map_mem(|x| x.len()).map_file(|f| f.size))] + pub data: MemOrFile>, /// outboard part, which can be in memory or on disk. - #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()))] - pub outboard: MemOrFile, + #[debug("{:?}", outboard.as_ref().map_mem(|x| x.len()).map_file(|f| f.size))] + pub outboard: MemOrFile>, } -impl CompleteStorage { +impl Default for CompleteStorage { + fn default() -> Self { + Self { + data: Default::default(), + outboard: Default::default(), + } + } +} + +impl CompleteStorage +where + T: bao_tree::io::sync::ReadAt, +{ /// Read from the data file at the given offset, until end of file or max bytes. pub fn read_data_at(&self, offset: u64, len: usize) -> Bytes { match &self.data { MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len), - MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(), + MemOrFile::File(FileAndSize { file, size: _ }) => { + read_to_end(file, offset, len).unwrap() + } } } @@ -104,7 +118,9 @@ impl CompleteStorage { pub fn read_outboard_at(&self, offset: u64, len: usize) -> Bytes { match &self.outboard { MemOrFile::Mem(mem) => get_limited_slice(mem, offset, len), - MemOrFile::File((file, _size)) => read_to_end(file, offset, len).unwrap(), + MemOrFile::File(FileAndSize { file, size: _ }) => { + read_to_end(file, offset, len).unwrap() + } } } @@ -112,7 +128,7 @@ impl CompleteStorage { pub fn data_size(&self) -> u64 { match &self.data { MemOrFile::Mem(mem) => mem.len() as u64, - MemOrFile::File((_file, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } @@ -120,7 +136,7 @@ impl CompleteStorage { pub fn outboard_size(&self) -> u64 { match &self.outboard { MemOrFile::Mem(mem) => mem.len() as u64, - MemOrFile::File((_file, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } } @@ -244,7 +260,7 @@ impl FileStorage { /// The storage for a bao file. This can be either in memory or on disk. #[derive(Debug)] -pub(crate) enum BaoFileStorage { +pub(crate) enum BaoFileStorage { /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -261,16 +277,16 @@ pub(crate) enum BaoFileStorage { /// (memory or file). /// /// Writing to this is a no-op, since it is already complete. - Complete(CompleteStorage), + Complete(CompleteStorage), } -impl Default for BaoFileStorage { +impl Default for BaoFileStorage { fn default() -> Self { BaoFileStorage::Complete(Default::default()) } } -impl BaoFileStorage { +impl BaoFileStorage { /// Take the storage out, leaving an empty storage in its place. /// /// Be careful to put something back in its place, or you will lose data. @@ -310,11 +326,11 @@ impl BaoFileStorage { /// A weak reference to a bao file handle. #[derive(Debug, Clone)] -pub struct BaoFileHandleWeak(Weak); +pub struct BaoFileHandleWeak(Weak>); -impl BaoFileHandleWeak { +impl BaoFileHandleWeak { /// Upgrade to a strong reference if possible. - pub fn upgrade(&self) -> Option { + pub fn upgrade(&self) -> Option> { self.0.upgrade().map(BaoFileHandle) } @@ -324,17 +340,36 @@ impl BaoFileHandleWeak { } } +/// a type alias which represents the callback which is executed after +/// the write guard is dropped +type AfterLockWriteCb = Box) + Send + Sync + 'static>; + /// The inner part of a bao file handle. #[derive(Debug)] -pub struct BaoFileHandleInner { - pub(crate) storage: RwLock, +pub struct BaoFileHandleInner { + pub(crate) storage: CallbackLock, AfterLockWriteCb>, config: Arc, hash: Hash, + rx: tokio::sync::watch::Receiver, } /// A cheaply cloneable handle to a bao file, including the hash and the configuration. -#[derive(Debug, Clone, derive_more::Deref)] -pub struct BaoFileHandle(Arc); +#[derive(Debug)] +pub struct BaoFileHandle(Arc>); + +impl Deref for BaoFileHandle { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Clone for BaoFileHandle { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} pub(crate) type CreateCb = Arc io::Result<()> + Send + Sync>; @@ -375,13 +410,18 @@ impl BaoFileConfig { /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(Option); +pub struct DataReader(Option>); -async fn with_storage(opt: &mut Option, no_io: P, f: F) -> io::Result +async fn with_storage( + opt: &mut Option>, + no_io: P, + f: F, +) -> io::Result where - P: Fn(&BaoFileStorage) -> bool + Send + 'static, - F: FnOnce(&BaoFileStorage) -> io::Result + Send + 'static, + P: Fn(&BaoFileStorage) -> bool + Send + 'static, + F: FnOnce(&BaoFileStorage) -> io::Result + Send + 'static, T: Send + 'static, + H: Send + Sync + 'static, { let handle = opt .take() @@ -397,20 +437,17 @@ where return res; } }; - // otherwise, we have to spawn a task. - let (handle, res) = tokio::task::spawn_blocking(move || { - let storage = handle.storage.read().unwrap(); - let res = f(storage.deref()); - drop(storage); - (handle, res) - }) - .await - .expect("spawn_blocking failed"); + let storage_guard = handle.storage.read().await; + let res = f(storage_guard.deref()); + drop(storage_guard); *opt = Some(handle); res } -impl AsyncSliceReader for DataReader { +impl AsyncSliceReader for DataReader +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { with_storage( &mut self.0, @@ -440,9 +477,12 @@ impl AsyncSliceReader for DataReader { /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(Option); +pub struct OutboardReader(Option>); -impl AsyncSliceReader for OutboardReader { +impl AsyncSliceReader for OutboardReader +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { with_storage( &mut self.0, @@ -476,18 +516,55 @@ enum HandleChange { // later: size verified } -impl BaoFileHandle { +/// struct which stores simple metadata about the [BaoFileHandle] in a way that is +/// accessible in synchronous function calls +#[derive(Debug)] +struct StorageMeta { + complete: bool, + size: Result, +} + +impl StorageMeta { + fn new(storage: &BaoFileStorage) -> Self { + let size = match storage { + BaoFileStorage::Complete(mem) => Ok(mem.data_size()), + BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()), + BaoFileStorage::IncompleteFile(file) => file.current_size(), + }; + StorageMeta { + complete: matches!(storage, BaoFileStorage::Complete(_)), + size, + } + } +} + +impl BaoFileHandle +where + T: bao_tree::io::sync::ReadAt, +{ + /// internal helper function to initialize a new instance of self + fn new_inner(storage: BaoFileStorage, config: Arc, hash: Hash) -> Self { + let (tx, rx) = tokio::sync::watch::channel(StorageMeta::new(&storage)); + Self(Arc::new(BaoFileHandleInner { + storage: CallbackLock::new( + storage, + Box::new(move |storage: &BaoFileStorage| { + let _ = tx.send(StorageMeta::new(storage)); + }), + ), + config, + hash, + rx, + })) + } + /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. /// Since there are very likely to be many of these, we use an arc rwlock pub fn incomplete_mem(config: Arc, hash: Hash) -> Self { let storage = BaoFileStorage::incomplete_mem(); - Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), - config, - hash, - })) + Self::new_inner(storage, config, hash) } /// Create a new bao file handle with a partial file. @@ -498,54 +575,43 @@ impl BaoFileHandle { outboard: create_read_write(&paths.outboard)?, sizes: create_read_write(&paths.sizes)?, }); - Ok(Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), - config, - hash, - }))) + Ok(Self::new_inner(storage, config, hash)) } /// Create a new complete bao file handle. pub fn new_complete( config: Arc, hash: Hash, - data: MemOrFile, - outboard: MemOrFile, + data: MemOrFile>, + outboard: MemOrFile>, ) -> Self { let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard }); - Self(Arc::new(BaoFileHandleInner { - storage: RwLock::new(storage), - config, - hash, - })) + Self::new_inner(storage, config, hash) } /// Transform the storage in place. If the transform fails, the storage will /// be an immutable empty storage. #[cfg(feature = "fs-store")] - pub(crate) fn transform( + pub(crate) async fn transform( &self, - f: impl FnOnce(BaoFileStorage) -> io::Result, + f: impl std::ops::AsyncFnOnce(BaoFileStorage) -> io::Result>, ) -> io::Result<()> { - let mut lock = self.storage.write().unwrap(); + let mut lock = self.storage.write().await; let storage = lock.take(); - *lock = f(storage)?; + *lock = f(storage).await?; Ok(()) } /// True if the file is complete. pub fn is_complete(&self) -> bool { - matches!( - self.storage.read().unwrap().deref(), - BaoFileStorage::Complete(_) - ) + self.rx.borrow().deref().complete } /// An AsyncSliceReader for the data file. /// /// Caution: this is a reader for the unvalidated data file. Reading this /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { + pub fn data_reader(&self) -> DataReader { DataReader(Some(self.clone())) } @@ -553,23 +619,30 @@ impl BaoFileHandle { /// /// The outboard file is used to validate the data file. It is not guaranteed /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { + pub fn outboard_reader(&self) -> OutboardReader { OutboardReader(Some(self.clone())) } /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.storage.read().unwrap().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.data_size()), - BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()), - BaoFileStorage::IncompleteFile(file) => file.current_size(), - } + pub fn current_size(&self) -> Result { + self.rx + .borrow() + .size + .as_ref() + // NB: we return the io::ErrorKind here + // because io::Error is !Clone + .map_err(|e| e.kind()) + .copied() } /// The outboard for the file. - pub fn outboard(&self) -> io::Result> { + pub fn outboard(&self) -> io::Result>> { let root = self.hash.into(); - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let tree = BaoTree::new( + self.current_size() + .map_err(|kind| io::Error::new(kind, "an io error has occurred"))?, + IROH_BLOCK_SIZE, + ); let outboard = self.outboard_reader(); Ok(PreOrderOutboard { root, @@ -584,13 +657,13 @@ impl BaoFileHandle { } /// Create a new writer from the handle. - pub fn writer(&self) -> BaoFileWriter { + pub fn writer(&self) -> BaoFileWriter { BaoFileWriter(Some(self.clone())) } /// This is the synchronous impl for writing a batch. - fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result { - let mut storage = self.storage.write().unwrap(); + async fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result { + let mut storage = self.storage.write().await; match storage.deref_mut() { BaoFileStorage::IncompleteMem(mem) => { // check if we need to switch to file mode, otherwise write to memory @@ -625,7 +698,7 @@ impl BaoFileHandle { } /// Downgrade to a weak reference. - pub fn downgrade(&self) -> BaoFileHandleWeak { + pub fn downgrade(&self) -> BaoFileHandleWeak { BaoFileHandleWeak(Arc::downgrade(&self.0)) } } @@ -676,19 +749,17 @@ impl MutableMemStorage { /// It is a BaoFileHandle wrapped in an Option, so that we can take it out /// in the future. #[derive(Debug)] -pub struct BaoFileWriter(Option); +pub struct BaoFileWriter(Option>); -impl BaoBatchWriter for BaoFileWriter { +impl BaoBatchWriter for BaoFileWriter +where + T: Send + Sync + bao_tree::io::sync::ReadAt + 'static, +{ async fn write_batch(&mut self, size: u64, batch: Vec) -> std::io::Result<()> { let Some(handle) = self.0.take() else { return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy")); }; - let (handle, change) = tokio::task::spawn_blocking(move || { - let change = handle.write_batch(size, &batch); - (handle, change) - }) - .await - .expect("spawn_blocking failed"); + let change = handle.write_batch(size, &batch).await; match change? { HandleChange::None => {} HandleChange::MemToFile => { @@ -705,12 +776,7 @@ impl BaoBatchWriter for BaoFileWriter { let Some(handle) = self.0.take() else { return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy")); }; - let (handle, res) = tokio::task::spawn_blocking(move || { - let res = handle.storage.write().unwrap().sync_all(); - (handle, res) - }) - .await - .expect("spawn_blocking failed"); + let res = handle.storage.write().await.sync_all(); self.0 = Some(handle); res } @@ -828,7 +894,11 @@ pub mod test_support { (outboard.root.into(), chunk_ranges, encoded) } - pub async fn validate(handle: &BaoFileHandle, original: &[u8], ranges: &[Range]) { + pub async fn validate( + handle: &BaoFileHandle, + original: &[u8], + ranges: &[Range], + ) { let mut r = handle.data_reader(); for range in ranges { let start = range.start; diff --git a/src/store/fs.rs b/src/store/fs.rs index 0628dc183..eb11d67f6 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -117,7 +117,7 @@ use crate::{ BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender, }, - raw_outboard_size, MemOrFile, TagCounter, TagDrop, + raw_outboard_size, FileAndSize, MemOrFile, TagCounter, TagDrop, }, BlobFormat, Hash, HashAndFormat, Tag, TempTag, }; @@ -441,6 +441,82 @@ pub(crate) enum ImportSource { Memory(#[debug(skip)] Bytes), } +/// trait which defines the backend persistence layer +/// for this store. e.g. filesystem, s3 etc +pub trait Persistence: Send + Sync + Clone + 'static { + /// the type which represents a file which was read from the persistence + /// layer + type File: IrohFile + std::io::Read + std::fmt::Debug; + + /// return the size of the file in bytes if it can be found/read + /// otherwise return a [io::Error] + fn size(&self, path: &Path) -> impl Future> + Send + 'static; + + /// read the contents of the file at the path + /// returning the bytes of the file in the success case + /// and [io::Error] in the error case + fn read( + &self, + path: &Path, + ) -> impl Future, io::Error>> + Send + 'static; + + /// recursively ensure that the input path exists + fn create_dir_all( + &self, + path: &Path, + ) -> impl Future> + Send + 'static; + + /// read and return the file at the input path + fn open<'a>( + &'a self, + path: &'a Path, + ) -> impl Future> + Send + 'static; + + /// convert from a [std::fs::File] into this persistence layer [Self::File] type. + /// This is called when converting from a partial file (which exists on disk) + /// into a complete file (which exists where ever your implementation wants it to) + fn convert_std_file( + self: Arc, + file: std::fs::File, + ) -> impl Future> + Send + 'static; +} + +/// A persistence layer that writes to the local file system +#[derive(Debug, Clone, Copy)] +pub struct FileSystemPersistence; + +impl Persistence for FileSystemPersistence { + type File = std::fs::File; + + fn size(&self, path: &Path) -> impl Future> + 'static { + let fut = tokio::fs::metadata(path.to_owned()); + async move { fut.await.map(|m| m.len()) } + } + + fn read(&self, path: &Path) -> impl Future, io::Error>> + 'static { + tokio::fs::read(path.to_owned()) + } + + fn create_dir_all(&self, path: &Path) -> impl Future> + 'static { + tokio::fs::create_dir_all(path.to_owned()) + } + + fn open(&self, path: &Path) -> impl Future> + 'static { + let fut = tokio::fs::File::open(path.to_owned()); + async move { + let file = fut.await?; + Ok(file.into_std().await) + } + } + + async fn convert_std_file( + self: Arc, + file: std::fs::File, + ) -> Result { + Ok(file) + } +} + impl ImportSource { fn content(&self) -> MemOrFile<&[u8], &Path> { match self { @@ -450,19 +526,41 @@ impl ImportSource { } } - fn len(&self) -> io::Result { - match self { - Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()), - Self::External(path) => std::fs::metadata(path).map(|m| m.len()), - Self::Memory(data) => Ok(data.len() as u64), + fn len<'a, T: Persistence>( + &'a self, + fs: &'a T, + ) -> impl Future> + 'static { + enum Either { + Left(u64), + Right(T), + } + + let output = match self { + Self::TempFile(path) | Self::External(path) => { + let fut: std::pin::Pin< + Box> + Send + 'static>, + > = Box::pin(fs.size(path)); + Either::Right(fut) + } + Self::Memory(data) => Either::Left(data.len() as u64), + }; + async move { + match output { + Either::Left(size) => Ok(size), + Either::Right(fut) => fut.await, + } } } } /// Use BaoFileHandle as the entry type for the map. -pub type Entry = BaoFileHandle; +pub type Entry = BaoFileHandle; -impl super::MapEntry for Entry { +/// a trait which defines the interface which any [Persistence::File] type must adhere to +pub trait IrohFile: bao_tree::io::sync::ReadAt + Send + Sync + 'static {} +impl IrohFile for T where T: bao_tree::io::sync::ReadAt + Send + Sync + 'static {} + +impl super::MapEntry for Entry { fn hash(&self) -> Hash { self.hash() } @@ -478,7 +576,7 @@ impl super::MapEntry for Entry { } async fn outboard(&self) -> io::Result { - self.outboard() + BaoFileHandle::outboard(self) } async fn data_reader(&self) -> io::Result { @@ -486,7 +584,7 @@ impl super::MapEntry for Entry { } } -impl super::MapEntryMut for Entry { +impl super::MapEntryMut for Entry { async fn batch_writer(&self) -> io::Result { Ok(self.writer()) } @@ -520,12 +618,12 @@ pub(crate) struct Export { } #[derive(derive_more::Debug)] -pub(crate) enum ActorMessage { +pub(crate) enum ActorMessage { // Query method: get a file handle for a hash, if it exists. // This will produce a file handle even for entries that are not yet in redb at all. Get { hash: Hash, - tx: oneshot::Sender>>, + tx: oneshot::Sender>>>, }, /// Query method: get the rough entry status for a hash. Just complete, partial or not found. EntryStatus { @@ -537,7 +635,7 @@ pub(crate) enum ActorMessage { /// This is everything we got about the entry, including the actual inline outboard and data. EntryState { hash: Hash, - tx: oneshot::Sender>, + tx: oneshot::Sender>>, }, /// Query method: get the full entry state for a hash. GetFullEntryState { @@ -557,7 +655,7 @@ pub(crate) enum ActorMessage { /// will be created, but not yet written to redb. GetOrCreate { hash: Hash, - tx: oneshot::Sender>, + tx: oneshot::Sender>>, }, /// Modification method: inline size was exceeded for a partial entry. /// If the entry is complete, this is a no-op. If the entry is partial and in @@ -565,7 +663,7 @@ pub(crate) enum ActorMessage { OnMemSizeExceeded { hash: Hash }, /// Modification method: marks a partial entry as complete. /// Calling this on a complete entry is a no-op. - OnComplete { handle: BaoFileHandle }, + OnComplete { handle: BaoFileHandle }, /// Modification method: import data into a redb store /// /// At this point the size, hash and outboard must already be known. @@ -666,7 +764,7 @@ pub(crate) enum ActorMessage { Shutdown { tx: Option> }, } -impl ActorMessage { +impl ActorMessage { fn category(&self) -> MessageCategory { match self { Self::Get { .. } @@ -711,28 +809,45 @@ pub(crate) type FilterPredicate = /// Storage that is using a redb database for small files and files for /// large files. #[derive(Debug, Clone)] -pub struct Store(Arc); +pub struct Store(Arc>); -impl Store { - /// Load or create a new store. - pub async fn load(root: impl AsRef) -> io::Result { +impl Store { + /// load a new instance of a file system backed store at the given path + pub fn load(root: impl AsRef) -> impl Future> { let path = root.as_ref(); let db_path = path.join("blobs.db"); + + Store::load_with_backend(root, db_path, FileSystemPersistence) + } +} + +impl Store +where + T: Persistence, +{ + /// Load or create a new store. + pub async fn load_with_backend( + root: impl AsRef, + db_path: PathBuf, + backend: T, + ) -> io::Result { let options = Options { - path: PathOptions::new(path), + path: PathOptions::new(root.as_ref()), inline: Default::default(), batch: Default::default(), }; - Self::new(db_path, options).await + Self::new(db_path, options, backend).await } /// Create a new store with custom options. - pub async fn new(path: PathBuf, options: Options) -> io::Result { + pub async fn new(path: PathBuf, options: Options, backend: T) -> io::Result { // spawn_blocking because StoreInner::new creates directories let rt = tokio::runtime::Handle::try_current() .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?; - let inner = - tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??; + let inner = tokio::task::spawn_blocking(move || { + StoreInner::new_sync_with_backend(path, options, rt, backend) + }) + .await??; Ok(Self(Arc::new(inner))) } @@ -758,11 +873,12 @@ impl Store { } #[derive(Debug)] -struct StoreInner { - tx: async_channel::Sender, +struct StoreInner { + tx: async_channel::Sender>, temp: Arc>, handle: Option>, path_options: Arc, + fs: T, } impl TagDrop for RwLock { @@ -777,25 +893,33 @@ impl TagCounter for RwLock { } } -impl StoreInner { - fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { +impl StoreInner +where + T: Persistence, +{ + fn new_sync_with_backend( + path: PathBuf, + options: Options, + rt: tokio::runtime::Handle, + fs: T, + ) -> io::Result { tracing::trace!( "creating data directory: {}", options.path.data_path.display() ); - std::fs::create_dir_all(&options.path.data_path)?; + rt.block_on(fs.create_dir_all(&options.path.data_path))?; tracing::trace!( "creating temp directory: {}", options.path.temp_path.display() ); - std::fs::create_dir_all(&options.path.temp_path)?; + rt.block_on(fs.create_dir_all(&options.path.temp_path))?; tracing::trace!( "creating parent directory for db file{}", path.parent().unwrap().display() ); - std::fs::create_dir_all(path.parent().unwrap())?; + rt.block_on(fs.create_dir_all(path.parent().unwrap()))?; let temp: Arc> = Default::default(); - let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?; + let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone(), fs.clone())?; let handle = std::thread::Builder::new() .name("redb-actor".to_string()) .spawn(move || { @@ -811,16 +935,17 @@ impl StoreInner { temp, handle: Some(handle), path_options: Arc::new(options.path), + fs, }) } - pub async fn get(&self, hash: Hash) -> OuterResult> { + pub async fn get(&self, hash: Hash) -> OuterResult>> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::Get { hash, tx }).await?; Ok(rx.await??) } - async fn get_or_create(&self, hash: Hash) -> OuterResult { + async fn get_or_create(&self, hash: Hash) -> OuterResult> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?; Ok(rx.await??) @@ -949,7 +1074,7 @@ impl StoreInner { Ok(rx.recv()??) } - async fn complete(&self, entry: Entry) -> OuterResult<()> { + async fn complete(&self, entry: Entry) -> OuterResult<()> { self.tx .send(ActorMessage::OnComplete { handle: entry }) .await?; @@ -977,12 +1102,15 @@ impl StoreInner { .into()); } let parent = target.parent().ok_or_else(|| { - OuterError::from(io::Error::new( - io::ErrorKind::InvalidInput, - "target path has no parent directory", - )) + OuterError::Inner( + io::Error::new( + io::ErrorKind::InvalidInput, + "target path has no parent directory", + ) + .into(), + ) })?; - std::fs::create_dir_all(parent)?; + self.fs.create_dir_all(parent).await?; let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx @@ -1069,11 +1197,11 @@ impl StoreInner { let file = match mode { ImportMode::TryReference => ImportSource::External(path), ImportMode::Copy => { - if std::fs::metadata(&path)?.len() < 16 * 1024 { + if block_for(self.fs.size(&path))? < 16 * 1024 { // we don't know if the data will be inlined since we don't // have the inline options here. But still for such a small file // it does not seem worth it do to the temp file ceremony. - let data = std::fs::read(&path)?; + let data = block_for(self.fs.read(&path))?; ImportSource::Memory(data.into()) } else { let temp_path = self.temp_file_name(); @@ -1108,7 +1236,7 @@ impl StoreInner { id: u64, progress: impl ProgressSender + IdGenerator, ) -> OuterResult<(TempTag, u64)> { - let data_size = file.len()?; + let data_size = block_for(file.len(&self.fs))?; tracing::debug!("finalize_import_sync {:?} {}", file, data_size); progress.blocking_send(ImportProgress::Size { id, @@ -1119,7 +1247,7 @@ impl StoreInner { MemOrFile::File(path) => { let span = trace_span!("outboard.compute", path = %path.display()); let _guard = span.enter(); - let file = std::fs::File::open(path)?; + let file = block_for(self.fs.open(path))?; compute_outboard(file, data_size, move |offset| { Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?) })? @@ -1161,7 +1289,7 @@ impl StoreInner { } } -impl Drop for StoreInner { +impl Drop for StoreInner { fn drop(&mut self) { if let Some(handle) = self.handle.take() { self.tx @@ -1172,23 +1300,24 @@ impl Drop for StoreInner { } } -struct ActorState { - handles: BTreeMap, +struct ActorState { + handles: BTreeMap>, protected: BTreeSet, temp: Arc>, - msgs_rx: async_channel::Receiver, + msgs_rx: async_channel::Receiver>, create_options: Arc, options: Options, rt: tokio::runtime::Handle, + fs: Arc, } /// The actor for the redb store. /// /// It is split into the database and the rest of the state to allow for split /// borrows in the message handlers. -struct Actor { +struct Actor { db: redb::Database, - state: ActorState, + state: ActorState, } /// Error type for message handler functions of the redb actor. @@ -1249,8 +1378,8 @@ pub(crate) enum OuterError { JoinTask(#[from] tokio::task::JoinError), } -impl From> for OuterError { - fn from(_e: async_channel::SendError) -> Self { +impl From>> for OuterError { + fn from(_e: async_channel::SendError>) -> Self { OuterError::Send } } @@ -1275,16 +1404,22 @@ impl From for io::Error { } } -impl super::Map for Store { - type Entry = Entry; +impl super::Map for Store +where + T: Persistence, +{ + type Entry = Entry; async fn get(&self, hash: &Hash) -> io::Result> { Ok(self.0.get(*hash).await?) } } -impl super::MapMut for Store { - type EntryMut = Entry; +impl super::MapMut for Store +where + T: Persistence, +{ + type EntryMut = Entry; async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result { Ok(self.0.get_or_create(hash).await?) @@ -1307,7 +1442,10 @@ impl super::MapMut for Store { } } -impl super::ReadableStore for Store { +impl super::ReadableStore for Store +where + T: Persistence, +{ async fn blobs(&self) -> io::Result> { Ok(Box::new(self.0.blobs().await?.into_iter())) } @@ -1348,7 +1486,10 @@ impl super::ReadableStore for Store { } } -impl super::Store for Store { +impl super::Store for Store +where + T: Persistence + std::fmt::Debug, +{ async fn import_file( &self, path: PathBuf, @@ -1512,11 +1653,14 @@ impl super::Store for Store { } } -pub(super) async fn gc_sweep_task( - store: &Store, +pub(super) async fn gc_sweep_task( + store: &Store, live: &BTreeSet, co: &Co, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +where + T: Persistence, +{ let blobs = store.blobs().await?.chain(store.partial_blobs().await?); let mut count = 0; let mut batch = Vec::new(); @@ -1542,13 +1686,17 @@ pub(super) async fn gc_sweep_task( Ok(()) } -impl Actor { - fn new( +impl Actor +where + T: Persistence, +{ + fn new_with_backend( path: &Path, options: Options, temp: Arc>, rt: tokio::runtime::Handle, - ) -> ActorResult<(Self, async_channel::Sender)> { + fs: T, + ) -> ActorResult<(Self, async_channel::Sender>)> { let db = match redb::Database::create(path) { Ok(db) => db, Err(DatabaseError::UpgradeRequired(1)) => { @@ -1591,11 +1739,27 @@ impl Actor { options, create_options: Arc::new(create_options), rt, + fs: Arc::new(fs), }, }, tx, )) } +} + +impl Actor +where + T: Persistence, +{ + fn new( + path: &Path, + options: Options, + temp: Arc>, + rt: tokio::runtime::Handle, + fs: T, + ) -> ActorResult<(Self, async_channel::Sender>)> { + Self::new_with_backend(path, options, temp, rt, fs) + } async fn run_batched(mut self) -> ActorResult<()> { let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone()); @@ -1624,7 +1788,7 @@ impl Actor { tokio::select! { msg = msgs.recv() => { if let Some(msg) = msg { - if let Err(msg) = self.state.handle_readonly(&tables, msg)? { + if let Err(msg) = self.state.handle_readonly(&tables, msg).await? { msgs.push_back(msg).expect("just recv'd"); break; } @@ -1653,7 +1817,7 @@ impl Actor { tokio::select! { msg = msgs.recv() => { if let Some(msg) = msg { - if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? { + if let Err(msg) = self.state.handle_readwrite(&mut tables, msg).await? { msgs.push_back(msg).expect("just recv'd"); break; } @@ -1679,7 +1843,10 @@ impl Actor { } } -impl ActorState { +impl ActorState +where + T: Persistence, +{ fn entry_status( &mut self, tables: &impl ReadableTables, @@ -1695,11 +1862,11 @@ impl ActorState { Ok(status) } - fn get( + async fn get( &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult> { + ) -> ActorResult>> { if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) { return Ok(Some(handle)); } @@ -1715,14 +1882,17 @@ impl ActorState { data_location, outboard_location, } => { - let data = load_data(tables, &self.options.path, data_location, &hash)?; + let data = + load_data(tables, &self.options.path, data_location, &hash, &*self.fs).await?; let outboard = load_outboard( tables, &self.options.path, outboard_location, data.size(), &hash, - )?; + &*self.fs, + ) + .await?; BaoFileHandle::new_complete(config, hash, data, outboard) } EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?, @@ -1846,7 +2016,11 @@ impl ActorState { Ok(()) } - fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> { + async fn import( + &mut self, + tables: &mut Tables<'_>, + cmd: Import, + ) -> ActorResult<(TempTag, u64)> { let Import { content_id, source: file, @@ -1870,7 +2044,8 @@ impl ActorState { "reading external data to inline it: {}", external_path.display() ); - let data = Bytes::from(std::fs::read(&external_path)?); + let data = + Bytes::from(self.fs.read(&external_path).await.map_err(ActorError::Io)?); DataLocation::Inline(data) } else { DataLocation::External(vec![external_path], data_size) @@ -1940,11 +2115,11 @@ impl ActorState { Ok((tag, data_size)) } - fn get_or_create( + async fn get_or_create( &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult { + ) -> ActorResult> { self.protected.insert(hash); if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) { return Ok(handle); @@ -1958,14 +2133,18 @@ impl ActorState { outboard_location, .. } => { - let data = load_data(tables, &self.options.path, data_location, &hash)?; + let data = + load_data(tables, &self.options.path, data_location, &hash, &*self.fs) + .await?; let outboard = load_outboard( tables, &self.options.path, outboard_location, data.size(), &hash, - )?; + &*self.fs, + ) + .await?; tracing::debug!("creating complete entry for {}", hash.to_hex()); BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard) } @@ -2123,7 +2302,8 @@ impl ActorState { // inline if size <= self.options.inline.max_data_inlined { let path = self.options.path.owned_data_path(&hash); - let data = std::fs::read(&path)?; + let data = + block_for(self.fs.read(&path)).map_err(ActorError::Io)?; tables.delete_after_commit.insert(hash, [BaoFilePart::Data]); tables.inline_data.insert(hash, data.as_slice())?; (DataLocation::Inline(()), size, true) @@ -2158,7 +2338,9 @@ impl ActorState { if outboard_size <= self.options.inline.max_outboard_inlined => { let path = self.options.path.owned_outboard_path(&hash); - let outboard = std::fs::read(&path)?; + let outboard = + block_for(self.fs.read(&path)).map_err(ActorError::Io)?; + tables .delete_after_commit .insert(hash, [BaoFilePart::Outboard]); @@ -2254,36 +2436,45 @@ impl ActorState { Ok(()) } - fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> { + async fn on_complete( + &mut self, + tables: &mut Tables<'_>, + entry: BaoFileHandle, + ) -> ActorResult<()> { let hash = entry.hash(); let mut info = None; tracing::trace!("on_complete({})", hash.to_hex()); - entry.transform(|state| { - tracing::trace!("on_complete transform {:?}", state); - let entry = match complete_storage( - state, - &hash, - &self.options.path, - &self.options.inline, - tables.delete_after_commit, - )? { - Ok(entry) => { - // store the info so we can insert it into the db later - info = Some(( - entry.data_size(), - entry.data.mem().cloned(), - entry.outboard_size(), - entry.outboard.mem().cloned(), - )); - entry - } - Err(entry) => { - // the entry was already complete, nothing to do - entry - } - }; - Ok(BaoFileStorage::Complete(entry)) - })?; + entry + .transform(async |state| { + tracing::trace!("on_complete transform {:?}", state); + let entry = match complete_storage( + state, + &hash, + &self.options.path, + &self.options.inline, + tables.delete_after_commit, + self.fs.clone(), + ) + .await? + { + Ok(entry) => { + // store the info so we can insert it into the db later + info = Some(( + entry.data_size(), + entry.data.mem().cloned(), + entry.outboard_size(), + entry.outboard.mem().cloned(), + )); + entry + } + Err(entry) => { + // the entry was already complete, nothing to do + entry + } + }; + Ok(BaoFileStorage::Complete(entry)) + }) + .await?; if let Some((data_size, data, outboard_size, outboard)) = info { let data_location = if data.is_some() { DataLocation::Inline(()) @@ -2324,7 +2515,11 @@ impl ActorState { Ok(()) } - fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> { + fn handle_toplevel( + &mut self, + db: &redb::Database, + msg: ActorMessage, + ) -> ActorResult<()> { match msg { ActorMessage::UpdateInlineOptions { inline_options, @@ -2355,18 +2550,18 @@ impl ActorState { Ok(()) } - fn handle_readonly( + async fn handle_readonly( &mut self, tables: &impl ReadableTables, - msg: ActorMessage, - ) -> ActorResult> { + msg: ActorMessage, + ) -> ActorResult>> { match msg { ActorMessage::Get { hash, tx } => { - let res = self.get(tables, hash); + let res = self.get(tables, hash).await; tx.send(res).ok(); } ActorMessage::GetOrCreate { hash, tx } => { - let res = self.get_or_create(tables, hash); + let res = self.get_or_create(tables, hash).await; tx.send(res).ok(); } ActorMessage::EntryStatus { hash, tx } => { @@ -2402,14 +2597,14 @@ impl ActorState { Ok(Ok(())) } - fn handle_readwrite( + async fn handle_readwrite( &mut self, - tables: &mut Tables, - msg: ActorMessage, - ) -> ActorResult> { + tables: &mut Tables<'_>, + msg: ActorMessage, + ) -> ActorResult>> { match msg { ActorMessage::Import { cmd, tx } => { - let res = self.import(tables, cmd); + let res = self.import(tables, cmd).await; tx.send(res).ok(); } ActorMessage::SetTag { tag, value, tx } => { @@ -2437,7 +2632,7 @@ impl ActorState { tx.send(res).ok(); } ActorMessage::OnComplete { handle } => { - let res = self.on_complete(tables, handle); + let res = self.on_complete(tables, handle).await; res.ok(); } ActorMessage::Export { cmd, tx } => { @@ -2457,7 +2652,7 @@ impl ActorState { } msg => { // try to handle it as readonly - if let Err(msg) = self.handle_readonly(tables, msg)? { + if let Err(msg) = self.handle_readonly(tables, msg).await? { return Ok(Err(msg)); } } @@ -2510,12 +2705,31 @@ fn dump(tables: &impl ReadableTables) -> ActorResult<()> { Ok(()) } -fn load_data( +fn block_for(fut: F) -> F::Output +where + F: Future + Send + 'static, + F::Output: Send, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let res = fut.await; + tx.send(res) + .map_err(|_| format!("Error sending {}", std::any::type_name::())) + .expect("rx cannot be dropped yet"); + }); + rx.blocking_recv().expect("The sender cannot be dropped") +} + +async fn load_data( tables: &impl ReadableTables, options: &PathOptions, location: DataLocation<(), u64>, hash: &Hash, -) -> ActorResult> { + fs: &T, +) -> ActorResult>> +where + T: Persistence, +{ Ok(match location { DataLocation::Inline(()) => { let Some(data) = tables.inline_data().get(hash)? else { @@ -2528,14 +2742,17 @@ fn load_data( } DataLocation::Owned(data_size) => { let path = options.owned_data_path(hash); - let Ok(file) = std::fs::File::open(&path) else { + let Ok(file) = fs.open(&path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {}", path.display()), ) .into()); }; - MemOrFile::File((file, data_size)) + MemOrFile::File(FileAndSize { + file, + size: data_size, + }) } DataLocation::External(paths, data_size) => { if paths.is_empty() { @@ -2544,25 +2761,29 @@ fn load_data( )); } let path = &paths[0]; - let Ok(file) = std::fs::File::open(path) else { + let Ok(file) = fs.open(path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("external file not found: {}", path.display()), ) .into()); }; - MemOrFile::File((file, data_size)) + MemOrFile::File(FileAndSize { + file, + size: data_size, + }) } }) } -fn load_outboard( +async fn load_outboard( tables: &impl ReadableTables, options: &PathOptions, location: OutboardLocation, size: u64, hash: &Hash, -) -> ActorResult> { + fs: &T, +) -> ActorResult>> { Ok(match location { OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()), OutboardLocation::Inline(_) => { @@ -2577,26 +2798,33 @@ fn load_outboard( OutboardLocation::Owned => { let outboard_size = raw_outboard_size(size); let path = options.owned_outboard_path(hash); - let Ok(file) = std::fs::File::open(&path) else { + let Ok(file) = fs.open(&path).await else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("file not found: {} size={}", path.display(), outboard_size), ) .into()); }; - MemOrFile::File((file, outboard_size)) + MemOrFile::File(FileAndSize { + file, + size: outboard_size, + }) } }) } /// Take a possibly incomplete storage and turn it into complete -fn complete_storage( - storage: BaoFileStorage, +async fn complete_storage( + storage: BaoFileStorage, hash: &Hash, path_options: &PathOptions, inline_options: &InlineOptions, delete_after_commit: &mut DeleteSet, -) -> ActorResult> { + fs: Arc, +) -> ActorResult, CompleteStorage>> +where + T: Persistence, +{ let (data, outboard, _sizes) = match storage { BaoFileStorage::Complete(c) => return Ok(Err(c)), BaoFileStorage::IncompleteMem(storage) => { @@ -2635,13 +2863,33 @@ fn complete_storage( } else { // protect the data from previous deletions delete_after_commit.remove(*hash, [BaoFilePart::Data]); + let fs_2 = fs.clone(); match data { MemOrFile::Mem(data) => { let path = path_options.owned_data_path(hash); let file = overwrite_and_sync(&path, &data)?; - MemOrFile::File((file, data_size)) + MemOrFile::File( + block_for( + FileAndSize { + file, + size: data_size, + } + .map_async(move |f| fs_2.convert_std_file(f)), + ) + .transpose() + .map_err(ActorError::Io)?, + ) } - MemOrFile::File(data) => MemOrFile::File((data, data_size)), + MemOrFile::File(data) => MemOrFile::File( + FileAndSize { + file: data, + size: data_size, + } + .map_async(move |f| fs_2.convert_std_file(f)) + .await + .transpose() + .map_err(ActorError::Io)?, + ), } }; // inline outboard if needed, or write to file if needed @@ -2666,9 +2914,29 @@ fn complete_storage( MemOrFile::Mem(outboard) => { let path = path_options.owned_outboard_path(hash); let file = overwrite_and_sync(&path, &outboard)?; - MemOrFile::File((file, outboard_size)) + MemOrFile::File( + block_for( + FileAndSize { + file, + size: outboard_size, + } + .map_async(move |f| fs.convert_std_file(f)), + ) + .transpose() + .map_err(ActorError::Io)?, + ) } - MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)), + MemOrFile::File(outboard) => MemOrFile::File( + block_for( + FileAndSize { + file: outboard, + size: outboard_size, + } + .map_async(|f| fs.convert_std_file(f)), + ) + .transpose() + .map_err(ActorError::Io)?, + ), } }; // mark sizes for deletion after commit in any case - a complete entry diff --git a/src/store/fs/test_support.rs b/src/store/fs/test_support.rs index 9cc62bb86..07d718a52 100644 --- a/src/store/fs/test_support.rs +++ b/src/store/fs/test_support.rs @@ -12,7 +12,7 @@ use redb::ReadableTable; use super::{ tables::{ReadableTables, Tables}, ActorError, ActorMessage, ActorResult, ActorState, DataLocation, EntryState, FilterPredicate, - OutboardLocation, OuterResult, Store, StoreInner, + OutboardLocation, OuterResult, Persistence, Store, StoreInner, }; use crate::{ store::{mutable_mem_storage::SizeInfo, DbIter}, @@ -46,10 +46,13 @@ pub enum EntryData { }, } -impl Store { +impl Store +where + T: Persistence, +{ /// Get the complete state of an entry, both in memory and in redb. #[cfg(test)] - pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result { + pub(crate) async fn entry_state(&self, hash: Hash) -> io::Result> { Ok(self.0.entry_state(hash).await?) } @@ -102,9 +105,12 @@ impl Store { } } -impl StoreInner { +impl StoreInner +where + T: Persistence, +{ #[cfg(test)] - async fn entry_state(&self, hash: Hash) -> OuterResult { + async fn entry_state(&self, hash: Hash) -> OuterResult> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::EntryState { hash, tx }).await?; Ok(rx.await??) @@ -145,12 +151,15 @@ impl StoreInner { #[cfg(test)] #[derive(Debug)] -pub(crate) struct EntryStateResponse { - pub mem: Option, +pub(crate) struct EntryStateResponse { + pub mem: Option>, pub db: Option>>, } -impl ActorState { +impl ActorState +where + T: Persistence, +{ pub(super) fn get_full_entry_state( &mut self, tables: &impl ReadableTables, @@ -297,7 +306,7 @@ impl ActorState { &mut self, tables: &impl ReadableTables, hash: Hash, - ) -> ActorResult { + ) -> ActorResult> { let mem = self.handles.get(&hash).and_then(|weak| weak.upgrade()); let db = match tables.blobs().get(hash)? { Some(entry) => Some({ diff --git a/src/store/fs/tests.rs b/src/store/fs/tests.rs index 85540eb89..a503335a9 100644 --- a/src/store/fs/tests.rs +++ b/src/store/fs/tests.rs @@ -50,7 +50,7 @@ pub fn to_stream( .boxed() } -async fn create_test_db() -> (tempfile::TempDir, Store) { +async fn create_test_db() -> (tempfile::TempDir, Store) { let _ = tracing_subscriber::fmt::try_init(); let testdir = tempfile::tempdir().unwrap(); let db_path = testdir.path().join("db.redb"); @@ -59,7 +59,9 @@ async fn create_test_db() -> (tempfile::TempDir, Store) { batch: Default::default(), inline: Default::default(), }; - let db = Store::new(db_path, options).await.unwrap(); + let db = Store::new(db_path, options, FileSystemPersistence) + .await + .unwrap(); (testdir, db) } @@ -788,7 +790,9 @@ async fn actor_store_smoke() { batch: Default::default(), inline: Default::default(), }; - let db = Store::new(db_path, options).await.unwrap(); + let db = Store::new(db_path, options, FileSystemPersistence) + .await + .unwrap(); db.dump().await.unwrap(); let data = random_test_data(1024 * 1024); #[allow(clippy::single_range_in_vec_init)] diff --git a/src/store/fs/validate.rs b/src/store/fs/validate.rs index ae1870471..253afe377 100644 --- a/src/store/fs/validate.rs +++ b/src/store/fs/validate.rs @@ -5,14 +5,17 @@ use redb::ReadableTable; use super::{ raw_outboard_size, tables::Tables, ActorResult, ActorState, DataLocation, EntryState, Hash, - OutboardLocation, + OutboardLocation, Persistence, }; use crate::{ store::{fs::tables::BaoFilePart, ConsistencyCheckProgress, ReportLevel}, util::progress::BoxedProgressSender, }; -impl ActorState { +impl ActorState +where + T: Persistence, +{ //! This performs a full consistency check. Eventually it will also validate //! file content again, but that part is not yet implemented. //! diff --git a/src/util.rs b/src/util.rs index fcf3115bf..c5c4c94a4 100644 --- a/src/util.rs +++ b/src/util.rs @@ -19,9 +19,10 @@ pub mod fs; pub mod io; mod mem_or_file; pub mod progress; -pub use mem_or_file::MemOrFile; +pub use mem_or_file::{FileAndSize, MemOrFile}; mod sparse_mem_file; pub use sparse_mem_file::SparseMemFile; +pub mod callback_lock; pub mod local_pool; #[cfg(test)] diff --git a/src/util/callback_lock.rs b/src/util/callback_lock.rs new file mode 100644 index 000000000..41fba994a --- /dev/null +++ b/src/util/callback_lock.rs @@ -0,0 +1,77 @@ +//! This module defines a wrapper around a [`tokio::sync::RwLock`] that runs a callback +//! After any write operation occurs + +use std::future::Future; + +/// A wrapper over a [`tokio::sync::RwLock`] that executes a callback function after +/// the write guard is dropped +#[derive(derive_more::Debug)] +pub struct CallbackLock { + inner: tokio::sync::RwLock, + #[debug(skip)] + callback: F, +} + +/// the wrapper type over a [tokio::sync::RwLockWriteGuard] +#[derive(Debug)] +pub struct CallbackLockWriteGuard<'a, T, F: Fn(&T)> { + inner: tokio::sync::RwLockWriteGuard<'a, T>, + callback: &'a F, +} + +impl std::ops::Deref for CallbackLockWriteGuard<'_, T, F> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for CallbackLockWriteGuard<'_, T, F> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Drop for CallbackLockWriteGuard<'_, T, F> { + fn drop(&mut self) { + (self.callback)(&*self.inner); + } +} + +impl CallbackLock +where + F: Fn(&T), +{ + /// create a new instance of the lock from a value + /// and the callback to evaluate when a write guard is dropped + pub fn new(val: T, callback: F) -> Self { + CallbackLock { + inner: tokio::sync::RwLock::new(val), + callback, + } + } + + /// return an instance of the write guard + pub async fn write(&self) -> CallbackLockWriteGuard<'_, T, F> { + let guard = self.inner.write().await; + + CallbackLockWriteGuard { + inner: guard, + callback: &self.callback, + } + } + + /// return the [tokio::sync::RwLockReadGuard] + /// this will not invoke the callback + pub fn read(&self) -> impl Future> { + self.inner.read() + } + + /// try to synchronously acquire a read lock + pub fn try_read( + &self, + ) -> Result, tokio::sync::TryLockError> { + self.inner.try_read() + } +} diff --git a/src/util/mem_or_file.rs b/src/util/mem_or_file.rs index d929a19c9..93aa27697 100644 --- a/src/util/mem_or_file.rs +++ b/src/util/mem_or_file.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io}; +use std::{fs::File, future::Future, io}; use bao_tree::io::sync::{ReadAt, Size}; use bytes::Bytes; @@ -14,9 +14,54 @@ pub enum MemOrFile { File(F), } +/// A struct which represents a handle to some file which +/// is _not_ in memory and its size +#[derive(derive_more::Debug)] +pub struct FileAndSize { + /// the generic file type + pub file: T, + /// the size in bytes of the file + pub size: u64, +} + +impl FileAndSize { + /// map the type of file asynchronously. + /// This is analogous to [Option::map] + pub fn map_async( + self, + f: F, + ) -> impl Future> + use + 'static + where + F: FnOnce(T) -> U + Send + 'static, + T: 'static, + U: Future + Send + 'static, + U::Output: Send + 'static, + { + let FileAndSize { file, size } = self; + async move { + FileAndSize { + file: f(file).await, + size, + } + } + } +} + +impl FileAndSize> { + /// factor out the error from inside the [FileAndSize] + /// this is analogous to [Option::transpose] + pub fn transpose(self) -> Result, U> { + let FileAndSize { file, size } = self; + match file { + Ok(t) => Ok(FileAndSize { file: t, size }), + Err(e) => Err(e), + } + } +} + /// Helper methods for a common way to use MemOrFile, where the memory part is something /// like a slice, and the file part is a tuple consisiting of path or file and size. -impl MemOrFile +impl MemOrFile> where M: AsRef<[u8]>, { @@ -24,7 +69,7 @@ where pub fn size(&self) -> u64 { match self { MemOrFile::Mem(mem) => mem.as_ref().len() as u64, - MemOrFile::File((_, size)) => *size, + MemOrFile::File(FileAndSize { file: _, size }) => *size, } } } diff --git a/tests/gc.rs b/tests/gc.rs index dcf76b4ef..d3b1ba017 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -22,8 +22,8 @@ use iroh_blobs::{ net_protocol::Blobs, rpc::client::{blobs, tags}, store::{ - bao_tree, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, GcConfig, MapEntryMut, - MapMut, ReportLevel, Store, + bao_tree, fs::FileSystemPersistence, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, + GcConfig, MapEntryMut, MapMut, ReportLevel, Store, }, util::{ progress::{AsyncChannelProgressSender, ProgressSender as _}, @@ -127,7 +127,7 @@ async fn persistent_node( path: PathBuf, gc_period: Duration, ) -> ( - Node, + Node>, async_channel::Receiver<()>, ) { let store = iroh_blobs::store::fs::Store::load(path).await.unwrap(); diff --git a/tests/rpc.rs b/tests/rpc.rs index 7dc12e7b2..a18eb1398 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -1,7 +1,7 @@ #![cfg(feature = "test")] use std::{net::SocketAddr, path::PathBuf, vec}; -use iroh_blobs::net_protocol::Blobs; +use iroh_blobs::{net_protocol::Blobs, store::fs::FileSystemPersistence}; use quic_rpc::client::QuinnConnector; use tempfile::TempDir; use testresult::TestResult; @@ -14,7 +14,7 @@ type BlobsClient = iroh_blobs::rpc::client::blobs::Client; #[derive(Debug)] pub struct Node { pub router: iroh::protocol::Router, - pub blobs: Blobs, + pub blobs: Blobs>, pub rpc_task: AbortOnDropHandle<()>, } diff --git a/tests/tags.rs b/tests/tags.rs index 8a4af2d54..13ad3d50a 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -8,6 +8,7 @@ use iroh_blobs::{ client::tags::{self, TagInfo}, proto::RpcService, }, + store::fs::FileSystemPersistence, Hash, HashAndFormat, }; use testresult::TestResult; @@ -142,7 +143,7 @@ async fn tags_smoke_mem() -> TestResult<()> { async fn tags_smoke_fs() -> TestResult<()> { let td = tempfile::tempdir()?; let endpoint = Endpoint::builder().bind().await?; - let blobs = Blobs::persistent(td.path().join("blobs.db")) + let blobs = Blobs::persistent(td.path(), td.path().join("blobs.db"), FileSystemPersistence) .await? .build(&endpoint); let client = blobs.client();