Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::cache::ObjectStoreCachePolicy;
use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::memory::Builder as MemoryBuilder;
use object_store::services::oss::Builder as OSSBuilder;
use object_store::services::s3::Builder as S3Builder;
use object_store::{util, ObjectStore};
Expand Down Expand Up @@ -207,12 +209,17 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await,
};

let mem_accessor = MemoryBuilder::default().build().unwrap();
object_store.map(|object_store| {
object_store
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
.layer(MetricsLayer)
.layer(LoggingLayer::default())
.layer(TracingLayer)
.layer(
CacheLayer::new(ObjectStore::new(mem_accessor))
.with_policy(ObjectStoreCachePolicy::default()),
)
})
}

Expand Down
1 change: 1 addition & 0 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait = "0.1"
futures = { version = "0.3" }
opendal = { version = "0.25.1", features = [
"layers-tracing",
Expand Down
62 changes: 62 additions & 0 deletions src/object-store/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::future::BoxFuture;
use opendal::layers::CachePolicy;
use opendal::raw::output::Reader;
use opendal::raw::{Accessor, RpRead};
use opendal::{ErrorKind, OpRead, OpWrite, Result};

#[derive(Debug, Default)]
pub struct ObjectStoreCachePolicy {}

impl ObjectStoreCachePolicy {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!("{}.cache-{}", path, args.range().to_header())
}
}

#[async_trait]
impl CachePolicy for ObjectStoreCachePolicy {
fn on_read(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpRead,
) -> BoxFuture<'static, Result<(RpRead, Reader)>> {
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
Box::pin(async move {
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => Ok(v),
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = cache
.write(&cache_path, OpWrite::new(size), Box::new(reader))
.await?;
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => Ok(v),
Err(_) => return inner.read(&path, args).await,
}
}
Err(_) => return inner.read(&path, args).await,
}
})
}
}
1 change: 1 addition & 0 deletions src/object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub use opendal::{
Operator as ObjectStore, Result,
};
pub mod backend;
pub mod cache;
pub mod test_util;
pub mod util;
48 changes: 48 additions & 0 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::env;

use anyhow::Result;
use common_telemetry::logging;
use futures::TryStreamExt;
use object_store::backend::{fs, s3};
use object_store::cache::ObjectStoreCachePolicy;
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use opendal::layers::CacheLayer;
use opendal::services::oss;
use tempdir::TempDir;

Expand Down Expand Up @@ -160,3 +163,48 @@ async fn test_oss_backend() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
let root_dir = TempDir::new("test_fs_backend")?;
let store = ObjectStore::new(
fs::Builder::default()
.root(&root_dir.path().to_string_lossy())
.build()?,
);

let cache_dir = TempDir::new("test_fs_cache")?;
let cache_op = ObjectStore::new(
fs::Builder::default()
.root(&cache_dir.path().to_string_lossy())
.build()?,
);
let test_cache_op = ObjectStore::from(cache_op.inner());
let store =
store.layer(CacheLayer::new(cache_op).with_policy(ObjectStoreCachePolicy::default()));

// Create object handler.
let object = store.object("test_file");

// Write data info object;
assert!(object.write("Hello, World!").await.is_ok());

// Read data from object;
let bs = object.read().await?;
assert_eq!("Hello, World!", String::from_utf8(bs)?);

// Read data from object cache;
let mut lister = test_cache_op.batch().walk_top_down("/")?;
while let Some(obj) = lister.try_next().await? {
match obj.mode().await? {
ObjectMode::FILE => {
assert!(obj.name().starts_with("test_file"));
let bs = obj.read().await?;
assert_eq!("Hello, World!", String::from_utf8(bs)?);
}
_ => continue,
}
}

Ok(())
}