diff --git a/indy-vdr-proxy/src/app.rs b/indy-vdr-proxy/src/app.rs index 5788362b..42806567 100644 --- a/indy-vdr-proxy/src/app.rs +++ b/indy-vdr-proxy/src/app.rs @@ -13,6 +13,9 @@ pub struct Config { pub is_multiple: bool, pub tls_cert_path: Option, pub tls_key_path: Option, + pub cache: bool, + pub cache_size: usize, + pub cache_path: Option, } pub fn load_config() -> Result { @@ -81,6 +84,21 @@ pub fn load_config() -> Result { .long("tls-key") .value_name("KEY") .help("Path to the TLS private key file") + ).arg( + Arg::new("use-cache") + .long("use-cache").action(ArgAction::SetTrue) + .value_name("CACHE") + .help("Whether to use cache or not") + ).arg( + Arg::new("cache-size") + .long("cache-size") + .value_name("CACHE_SIZE") + .help("Size of cache") + ).arg( + Arg::new("cache-path") + .long("cache-path") + .value_name("CACHE_PATH") + .help("Path to cache") ); #[cfg(unix)] @@ -139,6 +157,13 @@ pub fn load_config() -> Result { let tls_cert_path = matches.get_one::("tls-cert").cloned(); let tls_key_path = matches.get_one::("tls-key").cloned(); + let cache = matches.get_flag("use-cache"); + let cache_size = matches + .get_one::("cache-size") + .map(|ival| ival.parse::().map_err(|_| "Invalid cache size")) + .transpose()? + .unwrap_or(1000); + let cache_path = matches.get_one::("cache-path").cloned(); Ok(Config { genesis, @@ -152,5 +177,8 @@ pub fn load_config() -> Result { is_multiple, tls_cert_path, tls_key_path, + cache, + cache_size, + cache_path, }) } diff --git a/indy-vdr-proxy/src/handlers.rs b/indy-vdr-proxy/src/handlers.rs index f8dabb6c..dc2f33ec 100644 --- a/indy-vdr-proxy/src/handlers.rs +++ b/indy-vdr-proxy/src/handlers.rs @@ -6,6 +6,7 @@ use std::rc::Rc; use std::time::UNIX_EPOCH; use hyper::{Body, Method, Request, Response, StatusCode}; +use indy_vdr::pool::cache::Cache; use percent_encoding::percent_decode_str; use regex::Regex; @@ -300,6 +301,7 @@ async fn get_attrib( raw: &str, seq_no: Option, timestamp: Option, + cache: Option>, ) -> VdrResult { let dest = DidValue::from_str(dest)?; let request = pool.get_request_builder().build_get_attrib_request( @@ -311,7 +313,7 @@ async fn get_attrib( seq_no, timestamp, )?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } @@ -320,59 +322,80 @@ async fn get_nym( nym: &str, seq_no: Option, timestamp: Option, + cache: Option>, ) -> VdrResult { let nym = DidValue::from_str(nym)?; let request = pool .get_request_builder() .build_get_nym_request(None, &nym, seq_no, timestamp)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_schema(pool: &T, schema_id: &str) -> VdrResult { +async fn get_schema( + pool: &T, + schema_id: &str, + cache: Option>, +) -> VdrResult { let schema_id = SchemaId::from_str(schema_id)?; let request = pool .get_request_builder() .build_get_schema_request(None, &schema_id)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_cred_def(pool: &T, cred_def_id: &str) -> VdrResult { +async fn get_cred_def( + pool: &T, + cred_def_id: &str, + cache: Option>, +) -> VdrResult { let cred_def_id = CredentialDefinitionId::from_str(cred_def_id)?; let request = pool .get_request_builder() .build_get_cred_def_request(None, &cred_def_id)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_revoc_reg_def(pool: &T, revoc_reg_def_id: &str) -> VdrResult { +async fn get_revoc_reg_def( + pool: &T, + revoc_reg_def_id: &str, + cache: Option>, +) -> VdrResult { let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?; let request = pool .get_request_builder() .build_get_revoc_reg_def_request(None, &revoc_reg_def_id)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_revoc_reg(pool: &T, revoc_reg_def_id: &str) -> VdrResult { +async fn get_revoc_reg( + pool: &T, + revoc_reg_def_id: &str, + cache: Option>, +) -> VdrResult { let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?; let request = pool.get_request_builder().build_get_revoc_reg_request( None, &revoc_reg_def_id, timestamp_now(), )?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_revoc_reg_delta(pool: &T, revoc_reg_def_id: &str) -> VdrResult { +async fn get_revoc_reg_delta( + pool: &T, + revoc_reg_def_id: &str, + cache: Option>, +) -> VdrResult { let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?; let request = pool .get_request_builder() .build_get_revoc_reg_delta_request(None, &revoc_reg_def_id, None, timestamp_now())?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } @@ -383,19 +406,25 @@ async fn test_get_validator_info(pool: &T, pretty: bool) -> VdrResult(pool: &T) -> VdrResult { +async fn get_taa( + pool: &T, + cache: Option>, +) -> VdrResult { let request = pool .get_request_builder() .build_get_txn_author_agreement_request(None, None)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_aml(pool: &T) -> VdrResult { +async fn get_aml( + pool: &T, + cache: Option>, +) -> VdrResult { let request = pool .get_request_builder() .build_get_acceptance_mechanisms_request(None, None, None)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } @@ -404,6 +433,7 @@ async fn get_auth_rule( auth_type: Option, auth_action: Option, field: Option, + cache: Option>, ) -> VdrResult { let request = pool.get_request_builder().build_get_auth_rule_request( None, @@ -413,24 +443,30 @@ async fn get_auth_rule( None, None, )?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, cache).await?; Ok(result.into()) } -async fn get_txn(pool: &T, ledger: LedgerType, seq_no: i32) -> VdrResult { - let result = perform_get_txn(pool, ledger.to_id(), seq_no).await?; +async fn get_txn( + pool: &T, + ledger: LedgerType, + seq_no: i32, + cache: Option>, +) -> VdrResult { + let result = perform_get_txn(pool, ledger.to_id(), seq_no, cache).await?; Ok(result.into()) } async fn submit_request(pool: &T, message: Vec) -> VdrResult { let request = PreparedRequest::from_request_json(message)?; - let result = perform_ledger_request(pool, &request).await?; + let result = perform_ledger_request(pool, &request, None).await?; Ok(result.into()) } pub async fn handle_request( req: Request, state: Rc>, + cache: Option>, ) -> Result, hyper::Error> { let mut parts = req .uri() @@ -532,12 +568,12 @@ pub async fn handle_request( let resolver = Resolver::new(pool); // is DID Url if did.find('/').is_some() { - match resolver.dereference(did).await { + match resolver.dereference(did, cache.clone()).await { Ok(result) => Ok(ResponseType::Resolver(result)), Err(err) => http_status_msg(StatusCode::BAD_REQUEST, err.to_string()), } } else { - match resolver.resolve(did).await { + match resolver.resolve(did, cache).await { Ok(result) => Ok(ResponseType::Resolver(result)), Err(err) => http_status_msg(StatusCode::BAD_REQUEST, err.to_string()), } @@ -558,8 +594,8 @@ pub async fn handle_request( } } (&Method::GET, "genesis") => get_pool_genesis(&pool).await, - (&Method::GET, "taa") => get_taa(&pool).await, - (&Method::GET, "aml") => get_aml(&pool).await, + (&Method::GET, "taa") => get_taa(&pool, cache.clone()).await, + (&Method::GET, "aml") => get_aml(&pool, cache.clone()).await, (&Method::GET, "attrib") => { if let (Some(dest), Some(attrib)) = (parts.next(), parts.next()) { // NOTE: 'endpoint' is currently the only supported attribute @@ -569,7 +605,7 @@ pub async fn handle_request( let timestamp: Option = query_params .get("timestamp") .and_then(|ts| ts.as_str().parse().ok()); - get_attrib(&pool, &dest, &attrib, seq_no, timestamp).await + get_attrib(&pool, &dest, &attrib, seq_no, timestamp, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } @@ -582,18 +618,19 @@ pub async fn handle_request( Some(auth_type.to_owned()), Some(auth_action.to_owned()), Some("*".to_owned()), + cache.clone(), ) .await } else { http_status(StatusCode::NOT_FOUND) } } else { - get_auth_rule(&pool, None, None, None).await // get all + get_auth_rule(&pool, None, None, None, cache.clone()).await // get all } } (&Method::GET, "cred_def") => { if let Some(cred_def_id) = parts.next() { - get_cred_def(&pool, &cred_def_id).await + get_cred_def(&pool, &cred_def_id, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } @@ -606,35 +643,35 @@ pub async fn handle_request( let timestamp: Option = query_params .get("timestamp") .and_then(|ts| ts.as_str().parse().ok()); - get_nym(&pool, &nym, seq_no, timestamp).await + get_nym(&pool, &nym, seq_no, timestamp, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } } (&Method::GET, "rev_reg_def") => { if let Some(rev_reg_def_id) = parts.next() { - get_revoc_reg_def(&pool, &rev_reg_def_id).await + get_revoc_reg_def(&pool, &rev_reg_def_id, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } } (&Method::GET, "rev_reg") => { if let Some(rev_reg_def_id) = parts.next() { - get_revoc_reg(&pool, &rev_reg_def_id).await + get_revoc_reg(&pool, &rev_reg_def_id, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } } (&Method::GET, "rev_reg_delta") => { if let Some(rev_reg_def_id) = parts.next() { - get_revoc_reg_delta(&pool, &rev_reg_def_id).await + get_revoc_reg_delta(&pool, &rev_reg_def_id, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } } (&Method::GET, "schema") => { if let Some(schema_id) = parts.next() { - get_schema(&pool, &schema_id).await + get_schema(&pool, &schema_id, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } @@ -644,7 +681,7 @@ pub async fn handle_request( if let (Ok(ledger), Ok(txn)) = (LedgerType::try_from(ledger.as_str()), txn.parse::()) { - get_txn(&pool, ledger, txn).await + get_txn(&pool, ledger, txn, cache.clone()).await } else { http_status(StatusCode::NOT_FOUND) } diff --git a/indy-vdr-proxy/src/main.rs b/indy-vdr-proxy/src/main.rs index ea16501b..39df80e3 100644 --- a/indy-vdr-proxy/src/main.rs +++ b/indy-vdr-proxy/src/main.rs @@ -7,6 +7,7 @@ mod app; mod handlers; mod utils; +use indy_vdr::pool::cache::storage::{new_fs_ordered_store, OrderedHashMap}; use std::cell::RefCell; use std::collections::HashMap; #[cfg(unix)] @@ -36,6 +37,7 @@ use hyper_tls::HttpsConnector; #[cfg(unix)] use hyper_unix_connector::UnixConnector; +use indy_vdr::pool::cache::{strategy::CacheStrategyTTL, Cache}; #[cfg(feature = "tls")] use rustls_pemfile::{certs, pkcs8_private_keys}; #[cfg(feature = "tls")] @@ -426,13 +428,37 @@ where I::Conn: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin, I::Error: Into>, { + let cache = if config.cache { + let storage_type = match config.cache_path { + Some(path) => { + let storage = OrderedHashMap::new( + new_fs_ordered_store(path.clone()) + .expect(format!("Error creating cache at {}", path).as_str()), + ); + Some(storage) + } + None => None, + }; + let strategy = CacheStrategyTTL::new( + config.cache_size, + Duration::from_secs(86400).as_millis(), + storage_type, + None, + ); + let cache = Cache::new(strategy, None); + Some(cache) + } else { + None + }; + let until_done = run_pools(state.clone(), config.init_refresh, config.interval_refresh); let svc = make_service_fn(move |_| { let state = state.clone(); + let cache = cache.clone(); async move { let state = state.clone(); Ok::<_, hyper::Error>(service_fn(move |req| { - handlers::handle_request(req, state.to_owned()) + handlers::handle_request(req, state.to_owned(), cache.clone()) })) } }); diff --git a/libindy_vdr/Cargo.toml b/libindy_vdr/Cargo.toml index 734bc87a..71d42402 100644 --- a/libindy_vdr/Cargo.toml +++ b/libindy_vdr/Cargo.toml @@ -60,6 +60,9 @@ thiserror = "1.0" time = { version = "=0.3.20", features = ["parsing"] } url = "2.2.2" zmq = "0.9" +async-trait = "0.1.77" +async-lock = "3.3.0" +sled = "0.34.7" [dev-dependencies] rstest = "0.18" diff --git a/libindy_vdr/include/libindy_vdr.h b/libindy_vdr/include/libindy_vdr.h index 7f61bb9f..6ce64ab2 100644 --- a/libindy_vdr/include/libindy_vdr.h +++ b/libindy_vdr/include/libindy_vdr.h @@ -481,6 +481,8 @@ ErrorCode indy_vdr_resolve(PoolHandle pool_handle, ErrorCode indy_vdr_set_cache_directory(FfiStr path); +ErrorCode indy_vdr_set_ledger_txn_cache(int32_t capacity, int64_t expiry_offset_ms, FfiStr path); + ErrorCode indy_vdr_set_config(FfiStr config); ErrorCode indy_vdr_set_default_logger(void); diff --git a/libindy_vdr/src/common/error.rs b/libindy_vdr/src/common/error.rs index 98e3e6cf..b69769a9 100644 --- a/libindy_vdr/src/common/error.rs +++ b/libindy_vdr/src/common/error.rs @@ -130,6 +130,12 @@ impl From for VdrError { } } +impl From for VdrError { + fn from(err: sled::Error) -> VdrError { + VdrError::new(VdrErrorKind::FileSystem, None, Some(Box::new(err))) + } +} + impl From<(VdrErrorKind, M)> for VdrError where M: fmt::Display + Send + Sync + 'static, diff --git a/libindy_vdr/src/ffi/mod.rs b/libindy_vdr/src/ffi/mod.rs index 6ab2cc33..ef465b54 100644 --- a/libindy_vdr/src/ffi/mod.rs +++ b/libindy_vdr/src/ffi/mod.rs @@ -15,11 +15,16 @@ mod resolver; use crate::common::error::prelude::*; use crate::config::{PoolConfig, LIB_VERSION}; +use crate::pool::cache::storage::new_mem_ordered_store; +use crate::pool::cache::{ + storage::{new_fs_ordered_store, OrderedHashMap}, + strategy::CacheStrategyTTL, +}; use crate::pool::{FilesystemCache, PoolTransactionsCache, ProtocolVersion}; use crate::utils::Validatable; use self::error::{set_last_error, ErrorCode}; -use self::pool::{POOL_CACHE, POOL_CONFIG}; +use self::pool::{LEDGER_CACHE_STRATEGY, POOL_CACHE, POOL_CONFIG}; pub type CallbackId = i64; @@ -72,6 +77,24 @@ pub extern "C" fn indy_vdr_set_cache_directory(path: FfiStr) -> ErrorCode { } } +#[no_mangle] +pub extern "C" fn indy_vdr_set_ledger_txn_cache( + capacity: i32, + expire_offset: i64, + path_opt: FfiStr, +) -> ErrorCode { + catch_err! { + debug!("Setting pool ledger transactions cache: capacity={}, expire_offset={}", capacity, expire_offset); + let store = match path_opt.as_opt_str().unwrap_or_default() { + "" => OrderedHashMap::new(new_mem_ordered_store()), + path => OrderedHashMap::new(new_fs_ordered_store(path.into())?), + }; + + *write_lock!(LEDGER_CACHE_STRATEGY)? = Some(Arc::new(CacheStrategyTTL::new(capacity.try_into().ok().unwrap_or_default(), expire_offset.try_into().ok().unwrap_or_default(), Some(store), None))); + Ok(ErrorCode::Success) + } +} + #[no_mangle] pub extern "C" fn indy_vdr_set_socks_proxy(socks_proxy: FfiStr) -> ErrorCode { catch_err! { diff --git a/libindy_vdr/src/ffi/pool.rs b/libindy_vdr/src/ffi/pool.rs index a10bb4fc..6c998a28 100644 --- a/libindy_vdr/src/ffi/pool.rs +++ b/libindy_vdr/src/ffi/pool.rs @@ -9,6 +9,7 @@ use once_cell::sync::Lazy; use crate::common::error::prelude::*; use crate::common::handle::ResourceHandle; use crate::config::PoolConfig; +use crate::pool::cache::{Cache, CacheStrategy}; use crate::pool::{ InMemoryCache, PoolBuilder, PoolRunner, PoolTransactions, PoolTransactionsCache, RequestMethod, RequestResult, RequestResultMeta, @@ -40,6 +41,10 @@ pub static POOLS: Lazy>> = pub static POOL_CACHE: Lazy>>> = Lazy::new(|| RwLock::new(Some(Arc::new(InMemoryCache::new())))); +pub static LEDGER_CACHE_STRATEGY: Lazy< + RwLock>>>, +> = Lazy::new(|| RwLock::new(None)); + #[derive(Serialize, Deserialize, Debug, Clone)] struct PoolCreateParams { #[serde(skip_serializing_if = "Option::is_none")] @@ -68,6 +73,9 @@ pub extern "C" fn indy_vdr_pool_create(params: FfiStr, handle_p: *mut PoolHandle "Invalid pool create parameters: must provide transactions or transactions_path" )); }; + // set this cache with unique key prefix + let txn_cache = read_lock!(LEDGER_CACHE_STRATEGY)?.as_ref().map(|s| Cache::new(s.clone(), txns.root_hash_base58().ok())); + let mut cached = false; if let Some(cache) = read_lock!(POOL_CACHE)?.as_ref() { if let Some(newer_txns) = cache.resolve_latest(&txns)? { @@ -76,7 +84,7 @@ pub extern "C" fn indy_vdr_pool_create(params: FfiStr, handle_p: *mut PoolHandle } } let config = read_lock!(POOL_CONFIG)?.clone(); - let runner = PoolBuilder::new(config, txns.clone()).node_weights(params.node_weights.clone()).refreshed(cached).into_runner()?; + let runner = PoolBuilder::new(config, txns.clone()).node_weights(params.node_weights.clone()).refreshed(cached).into_runner(txn_cache)?; let handle = PoolHandle::next(); let mut pools = write_lock!(POOLS)?; pools.insert(handle, PoolInstance { runner, init_txns: txns, node_weights: params.node_weights }); @@ -102,7 +110,9 @@ fn handle_pool_refresh( cache.update(&init_txns, latest_txns)?; } if let Some(new_txns) = new_txns { - let runner = PoolBuilder::new(config, new_txns).node_weights(node_weights).refreshed(true).into_runner()?; + // set this cache with unique key prefix + let txn_cache = read_lock!(LEDGER_CACHE_STRATEGY)?.as_ref().map(|s| Cache::new(s.clone(), new_txns.root_hash_base58().ok())); + let runner = PoolBuilder::new(config, new_txns).node_weights(node_weights).refreshed(true).into_runner(txn_cache)?; let mut pools = write_lock!(POOLS)?; if let Entry::Occupied(mut entry) = pools.entry(pool_handle) { entry.get_mut().runner = runner; diff --git a/libindy_vdr/src/lib.rs b/libindy_vdr/src/lib.rs index c872312e..16c5c0c1 100644 --- a/libindy_vdr/src/lib.rs +++ b/libindy_vdr/src/lib.rs @@ -33,7 +33,7 @@ //! // Create a new GET_TXN request and dispatch it //! let ledger_type = 1; // 1 identifies the Domain ledger, see pool::LedgerType //! let seq_no = 1; // Transaction sequence number -//! let (result, _meta) = block_on(perform_get_txn(&pool, ledger_type, seq_no)).unwrap(); +//! let (result, _meta) = block_on(perform_get_txn(&pool, ledger_type, seq_no, None)).unwrap(); #![cfg_attr(feature = "fatal_warnings", deny(warnings))] #![recursion_limit = "1024"] // for select! macro usage diff --git a/libindy_vdr/src/pool/builder.rs b/libindy_vdr/src/pool/builder.rs index d85fdf22..c6a6a2f7 100644 --- a/libindy_vdr/src/pool/builder.rs +++ b/libindy_vdr/src/pool/builder.rs @@ -3,10 +3,12 @@ use std::collections::HashMap; use crate::common::error::prelude::*; use crate::config::PoolConfig; +use super::cache::Cache; use super::genesis::PoolTransactions; use super::manager::{LocalPool, SharedPool}; use super::networker::{MakeLocal, MakeShared, ZMQNetworkerFactory}; use super::runner::PoolRunner; +use super::RequestResultMeta; /// A utility class for building a new pool instance or runner. #[derive(Clone)] @@ -67,7 +69,10 @@ impl PoolBuilder { /// Create a `PoolRunner` instance from the builder, to handle pool interaction /// in a dedicated thread. - pub fn into_runner(self) -> VdrResult { + pub fn into_runner( + self, + cache: Option>, + ) -> VdrResult { let merkle_tree = self.transactions.merkle_tree()?; Ok(PoolRunner::new( self.config, @@ -75,6 +80,7 @@ impl PoolBuilder { MakeLocal(ZMQNetworkerFactory {}), self.node_weights, self.refreshed, + cache, )) } } diff --git a/libindy_vdr/src/pool/cache/mod.rs b/libindy_vdr/src/pool/cache/mod.rs new file mode 100644 index 00000000..7950eb20 --- /dev/null +++ b/libindy_vdr/src/pool/cache/mod.rs @@ -0,0 +1,65 @@ +use async_lock::RwLock; +use async_trait::async_trait; +use std::{fmt::Display, sync::Arc}; + +pub mod storage; +pub mod strategy; + +#[async_trait] +pub trait CacheStrategy: Send + Sync + 'static { + async fn get(&self, key: &K) -> Option; + + async fn remove(&mut self, key: &K) -> Option; + + async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option; +} + +pub struct Cache { + storage: Arc>>, + key_prefix: Option, +} + +impl Cache { + fn full_key(&self, key: &K) -> String { + match &self.key_prefix { + Some(prefix) => format!("{}{}", prefix, key), + None => key.to_string(), + } + } + + pub fn new(storage: impl CacheStrategy, key_prefix: Option) -> Self { + Self { + storage: Arc::new(RwLock::new(storage)), + key_prefix, + } + } + + pub async fn get(&self, key: &K) -> Option { + let full_key = self.full_key(key); + self.storage.read().await.get(&full_key).await + } + + pub async fn remove(&self, key: &K) -> Option { + let full_key = self.full_key(key); + self.storage.write().await.remove(&full_key).await + } + + pub async fn insert(&self, key: K, value: V, custom_exp_offset: Option) -> Option { + let full_key = self.full_key(&key); + self.storage + .write() + .await + .insert(full_key, value, custom_exp_offset) + .await + } +} + +// need to implement Clone manually because Mutex doesn't implement Clone +impl Clone for Cache { + fn clone(&self) -> Self { + Self { + storage: self.storage.clone(), + key_prefix: self.key_prefix.clone(), + } + } +} diff --git a/libindy_vdr/src/pool/cache/storage.rs b/libindy_vdr/src/pool/cache/storage.rs new file mode 100644 index 00000000..aea7d9ad --- /dev/null +++ b/libindy_vdr/src/pool/cache/storage.rs @@ -0,0 +1,231 @@ +use serde::{de::DeserializeOwned, Serialize}; +use sled::{self, Tree}; +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; +pub fn new_fs_ordered_store( + path: String, +) -> Result, sled::Error> { + sled::open(path.clone()).and_then(|db| db.open_tree(path)) +} +pub fn new_mem_ordered_store( +) -> impl OrderedStore { + BTreeMap::new() +} +pub trait OrderedStore: Send + Sync { + fn len(&self) -> usize; + fn first_key_value(&self) -> Option<(O, V)>; + fn last_key_value(&self) -> Option<(O, V)>; + fn get(&self, key: &O) -> Option; + fn insert(&mut self, key: O, value: V) -> Option; + fn remove(&mut self, key: &O) -> Option; + fn entries(&self) -> Box + '_>; +} +impl OrderedStore for Tree { + fn len(&self) -> usize { + Tree::len(self) + } + fn first_key_value(&self) -> Option<(u128, V)> { + match self.first() { + Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| { + ( + u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])), + v, + ) + }), + _ => None, + } + } + fn last_key_value(&self) -> Option<(u128, V)> { + match self.last() { + Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| { + ( + u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])), + v, + ) + }), + _ => None, + } + } + fn get(&self, key: &u128) -> Option { + match Tree::get(self, key.to_be_bytes()).map(|v| v) { + Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(), + _ => None, + } + } + fn insert(&mut self, key: u128, value: V) -> Option { + match Tree::insert(self, key.to_be_bytes(), serde_json::to_vec(&value).unwrap()) { + Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(), + _ => None, + } + } + fn remove(&mut self, key: &u128) -> Option { + match Tree::remove(self, key.to_be_bytes()).map(|v| v) { + Ok(Some(v)) => serde_json::from_slice(&v).ok(), + _ => None, + } + } + fn entries(&self) -> Box> { + Box::new(self.iter().filter_map(|r| { + r.ok().and_then(|(k, v)| { + serde_json::from_slice(v.as_ref()) + .ok() + .map(|v| (u128::from_be_bytes(k.as_ref().try_into().unwrap()), v)) + }) + })) + } +} +impl OrderedStore for BTreeMap { + fn len(&self) -> usize { + BTreeMap::len(self) + } + fn first_key_value(&self) -> Option<(O, V)> { + BTreeMap::first_key_value(self).map(|(o, v)| (*o, v.clone())) + } + fn last_key_value(&self) -> Option<(O, V)> { + BTreeMap::last_key_value(self).map(|(o, v)| (*o, v.clone())) + } + fn get(&self, key: &O) -> Option { + BTreeMap::get(self, key).map(|v| v.clone()) + } + fn insert(&mut self, key: O, value: V) -> Option { + BTreeMap::insert(self, key, value) + } + fn remove(&mut self, key: &O) -> Option { + BTreeMap::remove(self, key) + } + fn entries(&self) -> Box + '_> { + Box::new(self.iter().map(|(o, v)| (o.clone(), v.clone()))) + } +} +/// A hashmap that also maintains a BTreeMap of keys ordered by a given value +/// This is useful for structures that need fast O(1) lookups, but also need to evict the oldest or least recently used entries +/// The Ordered Store must contain both the keys and values for persistence +pub struct OrderedHashMap { + lookup: HashMap, + ordered_lookup: Box> + Send + Sync>, +} + +impl + OrderedHashMap +{ + pub fn new(order: impl OrderedStore> + 'static) -> Self { + let ordered_data = Box::new(order); + let mut keyed_data = HashMap::new(); + // ordered data may be from the FS, so we need to rebuild the keyed data + ordered_data.entries().for_each(|(order, keys)| { + keys.iter().for_each(|(k, v)| { + keyed_data.insert(k.clone(), (order.clone(), v.clone())); + }) + }); + Self { + lookup: keyed_data, + ordered_lookup: ordered_data, + } + } +} + +impl OrderedHashMap { + pub fn len(&self) -> usize { + let lookup = &self.lookup; + lookup.len() + } + pub fn get(&self, key: &K) -> Option<&(O, V)> { + let lookup = &self.lookup; + lookup.get(key) + } + fn get_key_value( + &self, + selector: impl FnOnce( + &Box> + Send + Sync>, + ) -> Option<(O, Vec)>, + ) -> Option<(K, O, V)> { + let lookup = &self.lookup; + let ordered_lookup = &self.ordered_lookup; + selector(ordered_lookup).and_then(|(_, keys)| { + keys.first().and_then(|key| { + lookup + .get(key) + .and_then(|(o, v)| Some((key.clone(), o.clone(), v.clone()))) + }) + }) + } + /// gets the entry with the lowest order value + pub fn get_first_key_value(&self) -> Option<(K, O, V)> { + self.get_key_value(|ordered_lookup| { + ordered_lookup.first_key_value().and_then(|(order, keys)| { + Some((order.clone(), keys.into_iter().map(|(k, _)| k).collect())) + }) + }) + } + /// gets the entry with the highest order value + pub fn get_last_key_value(&self) -> Option<(K, O, V)> { + self.get_key_value(|ordered_lookup| { + ordered_lookup.last_key_value().and_then(|(order, keys)| { + Some((order.clone(), keys.into_iter().map(|(k, _)| k).collect())) + }) + }) + } + /// re-orders the entry with the given new order + pub fn re_order(&mut self, key: &K, new_order: O) { + if let Some((_, value)) = self.remove(key) { + self.insert(key.clone(), value, new_order); + } + } + /// inserts a new entry with the given key and value and order + pub fn insert(&mut self, key: K, value: V, order: O) -> Option { + let lookup = &mut self.lookup; + let ordered_lookup = &mut self.ordered_lookup; + + if let Some((old_order, _)) = lookup.get(&key) { + // if entry already exists, remove it from the btree + if let Some(mut keys) = ordered_lookup.remove(old_order) { + keys.retain(|k| k.0 != key); + // insert modified keys back into btree + if !keys.is_empty() { + ordered_lookup.insert(old_order.clone(), keys); + } + } + } + let keys = match ordered_lookup.remove(&order) { + Some(mut ks) => { + ks.push((key.clone(), value.clone())); + ks + } + None => vec![(key.clone(), value.clone())], + }; + ordered_lookup.insert(order.clone(), keys); + lookup + .insert(key, (order, value)) + .and_then(|(_, v)| Some(v)) + } + /// removes the entry with the given key + pub fn remove(&mut self, key: &K) -> Option<(O, V)> { + let lookup = &mut self.lookup; + let ordered_lookup = &mut self.ordered_lookup; + lookup.remove(key).and_then(|(order, v)| { + match ordered_lookup.remove(&order) { + Some(mut keys) => { + keys.retain(|k| k.0 != *key); + // insert remaining keys back in + if !keys.is_empty() { + ordered_lookup.insert(order.clone(), keys); + } + } + None => {} + } + Some((order, v)) + }) + } + /// removes the entry with the lowest order value + pub fn remove_first(&mut self) -> Option<(K, O, V)> { + let first_key = self.get_first_key_value().map(|(k, _, _)| k.clone()); + if let Some(first_key) = first_key { + self.remove(&first_key) + .map(|(order, v)| (first_key, order, v)) + } else { + None + } + } +} diff --git a/libindy_vdr/src/pool/cache/strategy.rs b/libindy_vdr/src/pool/cache/strategy.rs new file mode 100644 index 00000000..1f6a2fca --- /dev/null +++ b/libindy_vdr/src/pool/cache/strategy.rs @@ -0,0 +1,212 @@ +use super::storage::OrderedHashMap; +use super::CacheStrategy; +use async_lock::Mutex; +use async_trait::async_trait; +use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime}; + +/// A simple struct to hold a value and the expiry offset +/// needed because items can be inserted with custom ttl values +/// that may need to be updated/reordered +#[derive(Clone, Serialize, Deserialize)] +pub struct TTLCacheItem { + value: V, + expire_offset: u128, +} + +/// A simple cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted. +/// Also uses LRU to evict entries that have not been accessed recently. +/// Uses a hashmap for lookups and a BTreeMap for ordering by age +pub struct CacheStrategyTTL { + store: Arc>>>, + capacity: usize, + create_time: SystemTime, + expire_after: u128, +} + +impl + CacheStrategyTTL +{ + /// Create a new cache with the given capacity and expiration time in milliseconds + /// If store_type is None, the cache will use an in-memory hashmap and BTreeMap + /// cache_time is used as a starting point to generate timestamps if it is None, the cache will use the UNIX_EPOCH as the cache start time + pub fn new( + capacity: usize, + expire_after: u128, + store_type: Option>>, + create_time: Option, + ) -> Self { + Self { + store: Arc::new(Mutex::new(match store_type { + Some(store) => store, + None => OrderedHashMap::new(BTreeMap::new()), + })), + capacity, + create_time: match create_time { + Some(time) => time, + None => SystemTime::UNIX_EPOCH, + }, + expire_after, + } + } +} + +#[async_trait] +impl CacheStrategy + for Arc> +{ + async fn get(&self, key: &K) -> Option { + self.get(key).await + } + async fn remove(&mut self, key: &K) -> Option { + self.remove(key).await + } + async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option { + self.insert(key, value, custom_exp_offset).await + } +} + +#[async_trait] +impl + CacheStrategy for CacheStrategyTTL +{ + async fn get(&self, key: &K) -> Option { + let mut store_lock = self.store.lock().await; + let current_time = SystemTime::now() + .duration_since(self.create_time) + .unwrap() + .as_millis(); + let get_res = match store_lock.get(key) { + Some((ts, v)) => { + if current_time < *ts { + Some((*ts, v.clone())) + } else { + store_lock.remove(key); + None + } + } + None => None, + }; + // update the timestamp if the entry is still valid + if let Some((_, ref v)) = get_res { + store_lock.re_order(key, current_time + v.expire_offset); + } + get_res.map(|(_, v)| v.value) + } + async fn remove(&mut self, key: &K) -> Option { + self.store.lock().await.remove(key).map(|(_, v)| v.value) + } + + async fn insert(&mut self, key: K, value: V, custom_exp_offset: Option) -> Option { + let mut store_lock = self.store.lock().await; + let current_ts = SystemTime::now() + .duration_since(self.create_time) + .unwrap() + .as_millis(); + + // remove expired entries + while store_lock.len() > 0 + && store_lock + .get_first_key_value() + .map(|(_, ts, _)| ts.clone() < current_ts) + .unwrap_or(false) + { + store_lock.remove_first(); + } + + // remove the oldest item if the cache is still full + if store_lock.len() >= self.capacity && store_lock.get(&key).is_none() { + // remove the oldest item + let removal_key = store_lock.get_first_key_value().map(|(k, _, _)| k.clone()); + if let Some(removal_key) = removal_key { + store_lock.remove(&removal_key); + } + }; + + let exp_offset = custom_exp_offset.unwrap_or(self.expire_after); + store_lock + .insert( + key, + TTLCacheItem { + value: value, + expire_offset: exp_offset, + }, + current_ts + exp_offset, + ) + .map(|v| v.value) + } +} + +#[cfg(test)] +mod tests { + + use std::thread; + + use super::*; + use crate::pool::cache::{storage::OrderedHashMap, Cache}; + use futures_executor::block_on; + + #[rstest] + fn test_cache_ttl() { + let cache = Cache::new(CacheStrategyTTL::new(2, 5, None, None), None); + let cache_location = "test_fs_cache_ttl"; + let tree = sled::open(cache_location) + .unwrap() + .open_tree(cache_location) + .unwrap(); + let storage: OrderedHashMap> = OrderedHashMap::new(tree); + let fs_cache = Cache::new(CacheStrategyTTL::new(2, 5, Some(storage), None), None); + let caches = vec![cache, fs_cache]; + block_on(async { + for cache in caches { + cache + .insert("key".to_string(), "value".to_string(), None) + .await; + assert_eq!( + cache.get(&"key".to_string()).await, + Some("value".to_string()) + ); + cache + .insert("key1".to_string(), "value1".to_string(), None) + .await; + cache + .insert("key2".to_string(), "value2".to_string(), None) + .await; + assert_eq!(cache.get(&"key".to_string()).await, None); + cache + .insert("key3".to_string(), "value3".to_string(), None) + .await; + cache.get(&"key2".to_string()).await; + cache + .insert("key4".to_string(), "value4".to_string(), None) + .await; + // key2 should not be evicted because of LRU + assert_eq!( + cache.remove(&"key2".to_string()).await, + Some("value2".to_string()) + ); + // key3 should be evicted because it was bumped to back after key2 was accessed + assert_eq!(cache.get(&"key3".to_string()).await, None); + cache + .insert("key5".to_string(), "value5".to_string(), None) + .await; + thread::sleep(std::time::Duration::from_millis(6)); + assert_eq!(cache.get(&"key5".to_string()).await, None); + // test ttl config + cache + .insert("key6".to_string(), "value6".to_string(), Some(1)) + .await; + cache + .insert("key7".to_string(), "value7".to_string(), None) + .await; + // wait until value6 expires + thread::sleep(std::time::Duration::from_millis(1)); + assert_eq!(cache.get(&"key6".to_string()).await, None); + assert_eq!( + cache.get(&"key7".to_string()).await, + Some("value7".to_string()) + ); + } + std::fs::remove_dir_all(cache_location).unwrap(); + }); + } +} diff --git a/libindy_vdr/src/pool/helpers.rs b/libindy_vdr/src/pool/helpers.rs index f426eadd..e5f1aea0 100644 --- a/libindy_vdr/src/pool/helpers.rs +++ b/libindy_vdr/src/pool/helpers.rs @@ -3,6 +3,7 @@ use std::string::ToString; use serde_json; +use super::cache::Cache; use super::genesis::PoolTransactions; use super::handlers::{ build_pool_catchup_request, build_pool_status_request, handle_catchup_request, @@ -24,7 +25,7 @@ pub async fn perform_pool_status_request( if pool.get_refreshed() { trace!("Performing fast status check"); - match perform_get_txn(pool, LedgerType::POOL.to_id(), 1).await { + match perform_get_txn(pool, LedgerType::POOL.to_id(), 1, None).await { Ok((RequestResult::Reply(reply), res_meta)) => { if let Ok(body) = serde_json::from_str::(&reply) { if let (Some(status_root_hash), Some(status_txn_count)) = ( @@ -166,10 +167,11 @@ pub async fn perform_get_txn( pool: &T, ledger_type: i32, seq_no: i32, + cache: Option>, ) -> VdrResult<(RequestResult, RequestResultMeta)> { let builder = pool.get_request_builder(); let prepared = builder.build_get_txn_request(None, ledger_type, seq_no)?; - perform_ledger_request(pool, &prepared).await + perform_ledger_request(pool, &prepared, cache).await } /// Dispatch a request to a specific set of nodes and collect the results @@ -184,10 +186,13 @@ pub async fn perform_ledger_action( handle_full_request(&mut request, node_aliases, timeout).await } +//do the caching here after we know if it is a read only + /// Dispatch a prepared ledger request to the appropriate handler pub async fn perform_ledger_request( pool: &T, prepared: &PreparedRequest, + cache_opt: Option>, ) -> VdrResult<(RequestResult, RequestResultMeta)> { let mut request = pool .create_request(prepared.req_id.clone(), prepared.req_json.to_string()) @@ -214,7 +219,34 @@ pub async fn perform_ledger_request( RequestMethod::Consensus => (None, (None, None), false, None), }; - handle_consensus_request(&mut request, sp_key, sp_timestamps, is_read_req, sp_parser).await + let cache_key = prepared.get_cache_key()?; + + if is_read_req { + if let Some(cache) = cache_opt.clone() { + if let Some((response, meta)) = cache.get(&cache_key).await { + return Ok((RequestResult::Reply(response), meta)); + } + } + } + let result = + handle_consensus_request(&mut request, sp_key, sp_timestamps, is_read_req, sp_parser).await; + if is_read_req && result.is_ok() { + if let (RequestResult::Reply(response), meta) = result.as_ref().unwrap() { + // check and made sure data is not null before caching + let serialized = serde_json::from_str::(response); + if let Ok(data) = serialized { + if data["result"]["data"].is_null() || data["result"]["seqNo"].is_null() { + return result; + } + } + if let Some(cache) = cache_opt { + cache + .insert(cache_key, (response.to_string(), meta.clone()), None) + .await; + } + } + } + return result; } /// Format a collection of node replies in the expected response format diff --git a/libindy_vdr/src/pool/mod.rs b/libindy_vdr/src/pool/mod.rs index 942f1a1d..f9debfea 100644 --- a/libindy_vdr/src/pool/mod.rs +++ b/libindy_vdr/src/pool/mod.rs @@ -1,4 +1,6 @@ mod builder; +/// A trait for managing a transaction cache +pub mod cache; mod genesis; /// Transaction request handlers pub(crate) mod handlers; diff --git a/libindy_vdr/src/pool/requests/prepared_request.rs b/libindy_vdr/src/pool/requests/prepared_request.rs index b0a51bc4..d1e3e036 100644 --- a/libindy_vdr/src/pool/requests/prepared_request.rs +++ b/libindy_vdr/src/pool/requests/prepared_request.rs @@ -1,4 +1,5 @@ use serde_json::{self, Value as SJsonValue}; +use sha2::{Digest, Sha256}; use super::new_request_id; use crate::common::error::prelude::*; @@ -80,6 +81,18 @@ impl PreparedRequest { } } + pub fn get_cache_key(&self) -> VdrResult { + let mut req_json = self.req_json.clone(); + let req_map = req_json + .as_object_mut() + .ok_or_else(|| input_err("Invalid request JSON"))?; + req_map.remove("reqId"); + req_map.remove("signature"); + let raw_key = serde_json::to_string(&req_json).with_input_err("Invalid request JSON"); + let hash = Sha256::digest(raw_key?.as_bytes()); + Ok(hex::encode(hash)) + } + /// Generate the normalized representation of a transaction for signing the request pub fn get_signature_input(&self) -> VdrResult { Ok(serialize_signature(&self.req_json)?) diff --git a/libindy_vdr/src/pool/runner.rs b/libindy_vdr/src/pool/runner.rs index 1f6dcf0b..2d4f892a 100644 --- a/libindy_vdr/src/pool/runner.rs +++ b/libindy_vdr/src/pool/runner.rs @@ -7,6 +7,7 @@ use futures_executor::block_on; use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::{select, FutureExt}; +use super::cache::Cache; use super::helpers::{perform_ledger_request, perform_refresh}; use super::networker::{Networker, NetworkerFactory}; use super::requests::PreparedRequest; @@ -34,6 +35,7 @@ impl PoolRunner { networker_factory: F, node_weights: Option>, refreshed: bool, + cache: Option>, ) -> Self where F: NetworkerFactory> + Send + 'static, @@ -49,7 +51,7 @@ impl PoolRunner { refreshed, ) .unwrap(); - let mut thread = PoolThread::new(pool, receiver); + let mut thread = PoolThread::new(pool, receiver, cache); thread.run(); debug!("Pool thread ended") }); @@ -159,11 +161,20 @@ impl PoolRunnerStatus { struct PoolThread { pool: LocalPool, receiver: UnboundedReceiver, + cache: Option>, } impl PoolThread { - fn new(pool: LocalPool, receiver: UnboundedReceiver) -> Self { - Self { pool, receiver } + fn new( + pool: LocalPool, + receiver: UnboundedReceiver, + cache: Option>, + ) -> Self { + Self { + pool, + receiver, + cache, + } } fn run(&mut self) { @@ -174,6 +185,7 @@ impl PoolThread { let mut futures = FuturesUnordered::new(); let receiver = &mut self.receiver; loop { + let cache_ledger_request = self.cache.clone(); select! { recv_evt = receiver.next() => { match recv_evt { @@ -199,7 +211,7 @@ impl PoolThread { futures.push(fut.boxed_local()); } Some(PoolEvent::SendRequest(request, callback)) => { - let fut = _perform_ledger_request(&self.pool, request, callback); + let fut = _perform_ledger_request(&self.pool, request, callback, cache_ledger_request); futures.push(fut.boxed_local()); } None => { trace!("Pool runner sender dropped") } @@ -226,7 +238,8 @@ async fn _perform_ledger_request( pool: &LocalPool, request: PreparedRequest, callback: Callback, + cache: Option>, ) { - let result = perform_ledger_request(pool, &request).await; + let result = perform_ledger_request(pool, &request, cache).await; callback(result); } diff --git a/libindy_vdr/src/pool/types.rs b/libindy_vdr/src/pool/types.rs index 99d94de3..b7963c3d 100644 --- a/libindy_vdr/src/pool/types.rs +++ b/libindy_vdr/src/pool/types.rs @@ -691,7 +691,7 @@ pub struct StateProofAssertions { pub txn_root_hash: String, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum StateProofResult { Missing, Invalid(String, Option), @@ -719,7 +719,7 @@ impl std::fmt::Display for StateProofResult { /// Type representing timing information collected for ledger transaction request pub type TimingResult = HashMap; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct RequestResultMeta { pub state_proof: HashMap, pub timing: Option, diff --git a/libindy_vdr/src/resolver/pool.rs b/libindy_vdr/src/resolver/pool.rs index bea59542..e3e9cca0 100644 --- a/libindy_vdr/src/resolver/pool.rs +++ b/libindy_vdr/src/resolver/pool.rs @@ -2,6 +2,7 @@ use super::did::DidUrl; use crate::common::error::prelude::*; use crate::ledger::RequestBuilder; +use crate::pool::cache::Cache; use crate::pool::{Pool, PoolRunner, RequestResult, RequestResultMeta}; use super::types::*; @@ -22,10 +23,14 @@ impl PoolResolver { } /// Dereference a DID Url and return a serialized `DereferencingResult` - pub async fn dereference(&self, did_url: &str) -> VdrResult { + pub async fn dereference( + &self, + did_url: &str, + cache: Option>, + ) -> VdrResult { debug!("PoolResolver: Dereference DID Url {}", did_url); let did_url = DidUrl::parse(did_url)?; - let (data, metadata) = self._resolve(&did_url).await?; + let (data, metadata) = self._resolve(&did_url, cache).await?; let content = match data { Result::Content(c) => Some(c), @@ -48,10 +53,14 @@ impl PoolResolver { } /// Resolve a DID and return a serialized `ResolutionResult` - pub async fn resolve(&self, did: &str) -> VdrResult { + pub async fn resolve( + &self, + did: &str, + cache: Option>, + ) -> VdrResult { debug!("PoolResolver: Resolve DID {}", did); let did = DidUrl::parse(did)?; - let (data, metadata) = self._resolve(&did).await?; + let (data, metadata) = self._resolve(&did, cache.clone()).await?; let md = if let Metadata::DidDocumentMetadata(md) = metadata { Some(md) @@ -85,9 +94,10 @@ impl PoolResolver { } else { (None, None) }; - doc.endpoint = fetch_legacy_endpoint(&self.pool, &did.id, seq_no, timestamp) - .await - .ok(); + doc.endpoint = + fetch_legacy_endpoint(&self.pool, &did.id, seq_no, timestamp, cache) + .await + .ok(); } Some(doc.to_value()?) } @@ -104,14 +114,18 @@ impl PoolResolver { } // Internal method to resolve and dereference - async fn _resolve(&self, did_url: &DidUrl) -> VdrResult<(Result, Metadata)> { + async fn _resolve( + &self, + did_url: &DidUrl, + cache: Option>, + ) -> VdrResult<(Result, Metadata)> { let builder = self.pool.get_request_builder(); let request = build_request(did_url, &builder)?; debug!( "PoolResolver: Prepared Request for DID {}: {:#?}", did_url.id, request ); - let ledger_data = handle_request(&self.pool, &request).await?; + let ledger_data = handle_request(&self.pool, &request, cache).await?; let namespace = did_url.namespace.clone(); let result = handle_internal_resolution_result(namespace.as_str(), &ledger_data)?; diff --git a/libindy_vdr/src/resolver/utils.rs b/libindy_vdr/src/resolver/utils.rs index 4438824d..a99d7deb 100644 --- a/libindy_vdr/src/resolver/utils.rs +++ b/libindy_vdr/src/resolver/utils.rs @@ -11,6 +11,7 @@ use crate::ledger::constants; use crate::ledger::identifiers::{CredentialDefinitionId, RevocationRegistryId, SchemaId}; use crate::ledger::responses::{Endpoint, GetNymResultV1}; use crate::ledger::RequestBuilder; +use crate::pool::cache::Cache; use crate::pool::helpers::perform_ledger_request; use crate::pool::{Pool, PreparedRequest, RequestResult, RequestResultMeta}; use crate::utils::did::DidValue; @@ -251,8 +252,12 @@ pub fn parse_or_now(datetime: Option<&String>) -> VdrResult { } } -pub async fn handle_request(pool: &T, request: &PreparedRequest) -> VdrResult { - let (result, _meta) = request_transaction(pool, request).await?; +pub async fn handle_request( + pool: &T, + request: &PreparedRequest, + cache: Option>, +) -> VdrResult { + let (result, _meta) = request_transaction(pool, request, cache).await?; match result { RequestResult::Reply(data) => Ok(data), RequestResult::Failed(error) => Err(error), @@ -262,8 +267,9 @@ pub async fn handle_request(pool: &T, request: &PreparedRequest) -> Vdr pub async fn request_transaction( pool: &T, request: &PreparedRequest, + cache: Option>, ) -> VdrResult<(RequestResult, RequestResultMeta)> { - perform_ledger_request(pool, request).await + perform_ledger_request(pool, request, cache).await } /// Fetch legacy service endpoint using ATTRIB tx @@ -272,6 +278,7 @@ pub async fn fetch_legacy_endpoint( did: &DidValue, seq_no: Option, timestamp: Option, + cache: Option>, ) -> VdrResult { let builder = pool.get_request_builder(); let request = builder.build_get_attrib_request( @@ -287,7 +294,7 @@ pub async fn fetch_legacy_endpoint( "Fetching legacy endpoint for {} with request {:#?}", did, request ); - let ledger_data = handle_request(pool, &request).await?; + let ledger_data = handle_request(pool, &request, cache).await?; let (_, _, endpoint_data) = parse_ledger_data(&ledger_data)?; let endpoint_data: Endpoint = serde_json::from_str(endpoint_data.as_str().unwrap()) .map_err(|_| err_msg(VdrErrorKind::Resolver, "Could not parse endpoint data"))?; diff --git a/libindy_vdr/src/state_proof/mod.rs b/libindy_vdr/src/state_proof/mod.rs index b632ef67..a3ac8144 100644 --- a/libindy_vdr/src/state_proof/mod.rs +++ b/libindy_vdr/src/state_proof/mod.rs @@ -363,7 +363,9 @@ pub(crate) fn verify_parsed_sp( } if let Some(multi_sig) = multi_sig.as_ref() { - let Some((signature, participants, value)) = _parse_reply_for_proof_signature_checking(multi_sig) else { + let Some((signature, participants, value)) = + _parse_reply_for_proof_signature_checking(multi_sig) + else { return Err("State proof parsing of reply failed".into()); }; let verify_err = match _verify_proof_signature( diff --git a/libindy_vdr/tests/resolver.rs b/libindy_vdr/tests/resolver.rs index aa4a7d46..7d06fe04 100644 --- a/libindy_vdr/tests/resolver.rs +++ b/libindy_vdr/tests/resolver.rs @@ -56,7 +56,7 @@ mod send_resolver { // Resolve DID let resolver = Resolver::new(pool.pool); let qualified_did = format!("did:indy:test:{}", &identity.did); - let result = block_on(resolver.resolve(&qualified_did)).unwrap(); + let result = block_on(resolver.resolve(&qualified_did, None)).unwrap(); let v: serde_json::Value = serde_json::from_str(result.as_str()).unwrap(); @@ -107,7 +107,7 @@ mod send_resolver { let resolver = Resolver::new(pool.pool); let qualified_did = format!("did:indy:test:{}", &identity.did); let did_url = format!("{}?versionId={}", qualified_did, seq_no); - let result = block_on(resolver.resolve(&did_url)).unwrap(); + let result = block_on(resolver.resolve(&did_url, None)).unwrap(); let v: serde_json::Value = serde_json::from_str(result.as_str()).unwrap(); diff --git a/libindy_vdr/tests/utils/pool.rs b/libindy_vdr/tests/utils/pool.rs index 7ced5294..283bf874 100644 --- a/libindy_vdr/tests/utils/pool.rs +++ b/libindy_vdr/tests/utils/pool.rs @@ -59,10 +59,10 @@ impl TestPool { pub fn send_request(&self, prepared_request: &PreparedRequest) -> Result { block_on(async { - let (request_result, _meta) = perform_ledger_request(&self.pool, prepared_request) - .await - .unwrap(); - + let (request_result, _meta) = + perform_ledger_request(&self.pool, prepared_request, None) + .await + .unwrap(); match request_result { RequestResult::Reply(message) => Ok(message), RequestResult::Failed(err) => Err(err.extra().unwrap_or_default()), diff --git a/wrappers/javascript/indy-vdr-nodejs/src/NodeJSIndyVdr.ts b/wrappers/javascript/indy-vdr-nodejs/src/NodeJSIndyVdr.ts index c82e84f4..9a9129f7 100644 --- a/wrappers/javascript/indy-vdr-nodejs/src/NodeJSIndyVdr.ts +++ b/wrappers/javascript/indy-vdr-nodejs/src/NodeJSIndyVdr.ts @@ -141,6 +141,11 @@ export class NodeJSIndyVdr implements IndyVdr { this.handleError(this.nativeIndyVdr.indy_vdr_set_cache_directory(path)) } + public setLedgerTxnCache(options: { capacity: number; expiry_offset_ms: number; path?: string }): void { + const { capacity, expiry_offset_ms, path } = serializeArguments(options) + this.handleError(this.nativeIndyVdr.indy_vdr_set_ledger_txn_cache(capacity, expiry_offset_ms, path)) + } + public setDefaultLogger(): void { this.handleError(this.nativeIndyVdr.indy_vdr_set_default_logger()) } diff --git a/wrappers/javascript/indy-vdr-nodejs/src/library/NativeBindings.ts b/wrappers/javascript/indy-vdr-nodejs/src/library/NativeBindings.ts index 264602c8..272c2975 100644 --- a/wrappers/javascript/indy-vdr-nodejs/src/library/NativeBindings.ts +++ b/wrappers/javascript/indy-vdr-nodejs/src/library/NativeBindings.ts @@ -3,6 +3,7 @@ import type { ByteBuffer } from '../ffi' export interface NativeMethods { indy_vdr_set_config: (arg0: string) => number indy_vdr_set_cache_directory: (arg0: string) => number + indy_vdr_set_ledger_txn_cache: (arg0: number, arg1: number, arg2?: string) => number indy_vdr_set_default_logger: () => number indy_vdr_set_protocol_version: (arg0: number) => number indy_vdr_set_socks_proxy: (arg0: string) => number diff --git a/wrappers/javascript/indy-vdr-react-native/cpp/HostObject.cpp b/wrappers/javascript/indy-vdr-react-native/cpp/HostObject.cpp index dca84796..c5c5e0a8 100644 --- a/wrappers/javascript/indy-vdr-react-native/cpp/HostObject.cpp +++ b/wrappers/javascript/indy-vdr-react-native/cpp/HostObject.cpp @@ -13,6 +13,7 @@ FunctionMap IndyVdrTurboModuleHostObject::functionMapping(jsi::Runtime &rt) { fMap.insert(std::make_tuple("getCurrentError", &indyVdr::getCurrentError)); fMap.insert(std::make_tuple("setConfig", &indyVdr::setConfig)); fMap.insert(std::make_tuple("setCacheDirectory", &indyVdr::setCacheDirectory)); + fMap.insert(std::make_tuple("setLedgerTxnCache", &indyVdr::setLedgerTxnCache)); fMap.insert(std::make_tuple("setDefaultLogger", &indyVdr::setDefaultLogger)); fMap.insert( std::make_tuple("setProtocolVersion", &indyVdr::setProtocolVersion)); diff --git a/wrappers/javascript/indy-vdr-react-native/cpp/include/libindy_vdr.h b/wrappers/javascript/indy-vdr-react-native/cpp/include/libindy_vdr.h index 7f61bb9f..6ce64ab2 100644 --- a/wrappers/javascript/indy-vdr-react-native/cpp/include/libindy_vdr.h +++ b/wrappers/javascript/indy-vdr-react-native/cpp/include/libindy_vdr.h @@ -481,6 +481,8 @@ ErrorCode indy_vdr_resolve(PoolHandle pool_handle, ErrorCode indy_vdr_set_cache_directory(FfiStr path); +ErrorCode indy_vdr_set_ledger_txn_cache(int32_t capacity, int64_t expiry_offset_ms, FfiStr path); + ErrorCode indy_vdr_set_config(FfiStr config); ErrorCode indy_vdr_set_default_logger(void); diff --git a/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.cpp b/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.cpp index 06c1ce7d..0b59d156 100644 --- a/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.cpp +++ b/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.cpp @@ -32,6 +32,16 @@ jsi::Value setCacheDirectory(jsi::Runtime &rt, jsi::Object options) { return createReturnValue(rt, code, nullptr); }; +jsi::Value setLedgerTxnCache(jsi::Runtime &rt, jsi::Object options) { + auto capacity = jsiToValue(rt, options, "capacity"); + auto expiry_offset_ms = jsiToValue(rt, options, "expiry_offset_ms"); + auto path = jsiToValue(rt, options, "path", true); + + ErrorCode code = indy_vdr_set_ledger_txn_cache(capacity, expiry_offset_ms, path.length() > 0 ? path.c_str() : nullptr); + + return createReturnValue(rt, code, nullptr); +}; + jsi::Value setDefaultLogger(jsi::Runtime &rt, jsi::Object options) { ErrorCode code = indy_vdr_set_default_logger(); diff --git a/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.h b/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.h index fa48a925..c708f3d4 100644 --- a/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.h +++ b/wrappers/javascript/indy-vdr-react-native/cpp/indyVdr.h @@ -13,6 +13,7 @@ jsi::Value version(jsi::Runtime &rt, jsi::Object options); jsi::Value getCurrentError(jsi::Runtime &rt, jsi::Object options); jsi::Value setConfig(jsi::Runtime &rt, jsi::Object options); jsi::Value setCacheDirectory(jsi::Runtime &rt, jsi::Object options); +jsi::Value setLedgerTxnCache(jsi::Runtime &rt, jsi::Object options); jsi::Value setDefaultLogger(jsi::Runtime &rt, jsi::Object options); jsi::Value setProtocolVersion(jsi::Runtime &rt, jsi::Object options); jsi::Value setSocksProxy(jsi::Runtime &rt, jsi::Object options); diff --git a/wrappers/javascript/indy-vdr-react-native/src/NativeBindings.ts b/wrappers/javascript/indy-vdr-react-native/src/NativeBindings.ts index 7a7f095e..5d602819 100644 --- a/wrappers/javascript/indy-vdr-react-native/src/NativeBindings.ts +++ b/wrappers/javascript/indy-vdr-react-native/src/NativeBindings.ts @@ -13,6 +13,8 @@ export interface NativeBindings { setCacheDirectory(options: { path: string }): ReturnObject + setLedgerTxnCache(options: { capacity: number; expiry_offset_ms: number; path?: string }): ReturnObject + setDefaultLogger(options: Record): ReturnObject setProtocolVersion(options: { version: number }): ReturnObject diff --git a/wrappers/javascript/indy-vdr-react-native/src/ReactNativeIndyVdr.ts b/wrappers/javascript/indy-vdr-react-native/src/ReactNativeIndyVdr.ts index e89f3f3f..734f8fe6 100644 --- a/wrappers/javascript/indy-vdr-react-native/src/ReactNativeIndyVdr.ts +++ b/wrappers/javascript/indy-vdr-react-native/src/ReactNativeIndyVdr.ts @@ -110,6 +110,10 @@ export class ReactNativeIndyVdr implements IndyVdr { const serializedOptions = serializeArguments(options) this.indyVdr.setCacheDirectory(serializedOptions) } + public setLedgerTxnCache(options: { capacity: number; expiry_offset_ms: number; path?: string }): void { + const serializedOptions = serializeArguments(options) + this.indyVdr.setLedgerTxnCache(serializedOptions) + } public setDefaultLogger(): void { this.handleError(this.indyVdr.setDefaultLogger({})) diff --git a/wrappers/javascript/indy-vdr-shared/src/types/IndyVdr.ts b/wrappers/javascript/indy-vdr-shared/src/types/IndyVdr.ts index bee59f8a..c94167d7 100644 --- a/wrappers/javascript/indy-vdr-shared/src/types/IndyVdr.ts +++ b/wrappers/javascript/indy-vdr-shared/src/types/IndyVdr.ts @@ -49,6 +49,8 @@ export interface IndyVdr { setCacheDirectory(options: { path: string }): void + setLedgerTxnCache(options: { capacity: number; expiry_offset_ms: number; path?: string }): void + setDefaultLogger(): void setProtocolVersion(options: { version: number }): void diff --git a/wrappers/python/indy_vdr/__init__.py b/wrappers/python/indy_vdr/__init__.py index 8e41f778..1902b12f 100644 --- a/wrappers/python/indy_vdr/__init__.py +++ b/wrappers/python/indy_vdr/__init__.py @@ -1,6 +1,6 @@ """indy-vdr Python wrapper library""" -from .bindings import set_cache_directory, set_config, set_protocol_version, version +from .bindings import set_cache_directory, set_ledger_txn_cache, set_config, set_protocol_version, version from .error import VdrError, VdrErrorCode from .ledger import LedgerType from .pool import Pool, open_pool @@ -10,6 +10,7 @@ __all__ = [ "open_pool", "set_cache_directory", + "set_ledger_txn_cache", "set_config", "set_protocol_version", "set_socks_proxy", diff --git a/wrappers/python/indy_vdr/bindings.py b/wrappers/python/indy_vdr/bindings.py index 488a8ea9..63cb5133 100644 --- a/wrappers/python/indy_vdr/bindings.py +++ b/wrappers/python/indy_vdr/bindings.py @@ -425,6 +425,9 @@ def set_cache_directory(path: str): """Set the library configuration.""" do_call("indy_vdr_set_cache_directory", encode_str(path)) +def set_ledger_txn_cache(capacity: int, expiry_offset_ms: int, path: str): + """Set the library configuration.""" + do_call("indy_vdr_set_ledger_txn_cache", c_size_t(capacity), c_ulong(expiry_offset_ms), encode_str(path)) def set_config(config: dict): """Set the library configuration."""