Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;

use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;
Expand Down Expand Up @@ -69,15 +70,30 @@ impl CacheManager {
}

/// Gets cached [ParquetMetaData].
pub fn get_parquet_meta_data(
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
// Try to get metadata from sst meta cache
let metadata = self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
});

if metadata.is_some() {
return metadata;
}

// Try to get metadata from write cache
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
if let Some(write_cache) = &self.write_cache {
if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
return Some(Arc::new(metadata));
}
Comment thread
QuenKar marked this conversation as resolved.
};

None
}

/// Puts [ParquetMetaData] into the cache.
Expand Down Expand Up @@ -315,8 +331,8 @@ mod tests {
use super::*;
use crate::cache::test_util::parquet_meta;

#[test]
fn test_disable_cache() {
#[tokio::test]
async fn test_disable_cache() {
let cache = CacheManager::default();
assert!(cache.sst_meta_cache.is_none());
assert!(cache.vector_cache.is_none());
Expand All @@ -326,7 +342,10 @@ mod tests {
let file_id = FileId::random();
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());

let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
Expand All @@ -346,17 +365,26 @@ mod tests {
assert!(cache.write_cache().is_none());
}

#[test]
fn test_parquet_meta_cache() {
#[tokio::test]
async fn test_parquet_meta_cache() {
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_some());
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_some());
cache.remove_parquet_meta_data(region_id, file_id);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());
}

#[test]
Expand Down
41 changes: 39 additions & 2 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use moka::future::Cache;
use moka::notification::RemovalCause;
use object_store::util::join_path;
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::RegionId;

Expand All @@ -34,6 +35,7 @@ use crate::error::{OpenDalSnafu, Result};
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;

/// Subdirectory of cached files.
const FILE_DIR: &str = "files/";
Expand Down Expand Up @@ -120,7 +122,7 @@ impl FileCache {
}
Err(e) => {
if e.kind() != ErrorKind::NotFound {
warn!("Failed to get file for key {:?}, err: {}", key, e);
warn!(e; "Failed to get file for key {:?}", key);
}
}
Ok(None) => {}
Expand Down Expand Up @@ -154,7 +156,7 @@ impl FileCache {
}
Err(e) => {
if e.kind() != ErrorKind::NotFound {
warn!("Failed to get file for key {:?}, err: {}", key, e);
warn!(e; "Failed to get file for key {:?}", key);
}

// We removes the file from the index.
Expand Down Expand Up @@ -226,6 +228,41 @@ impl FileCache {
self.local_store.clone()
}

/// Get the parquet metadata in file cache.
/// If the file is not in the cache or fail to load metadata, return None.
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
// Check if file cache contrains the key
if let Some(index_value) = self.memory_index.get(&key).await {
// Load metadata from file cache
let local_store = self.local_store();
let file_path = self.cache_file_path(key);
let file_size = index_value.file_size as u64;
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);

match metadata_loader.load().await {
Ok(metadata) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
Some(metadata)
}
Err(e) => {
if !e.is_object_not_found() {
warn!(
e; "Failed to get parquet metadata for key {:?}",
key
);
Comment thread
QuenKar marked this conversation as resolved.
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
}
} else {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
}

async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
Expand Down
96 changes: 79 additions & 17 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,43 +228,36 @@ pub struct SstUploadRequest {
#[cfg(test)]
mod tests {

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use object_store::util::join_dir;

use super::*;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
};
use crate::test_util::TestEnv;

#[tokio::test]
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm"))
.await
.unwrap();

// Create WriteCache
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
let object_store_manager = env.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(
local_store.clone(),
object_store_manager,
ReadableSize::mb(10),
intm_mgr,
)
.await
.unwrap();

let write_cache = env
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
.await;

// Create Source
let metadata = Arc::new(sst_region_metadata());
Expand Down Expand Up @@ -327,4 +320,73 @@ mod tests {
.unwrap();
assert_eq!(remote_index_data, cache_index_data);
}

#[tokio::test]
async fn test_read_metadata_from_write_cache() {
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();

let local_dir = create_temp_dir("");
let local_path = local_dir.path().to_str().unwrap();
let local_store = new_fs_store(local_path);

// Create a cache manager using only write cache
let write_cache = env
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
.await;
let cache_manager = Arc::new(
CacheManager::builder()
.write_cache(Some(write_cache.clone()))
.build(),
);

// Create source
let metadata = Arc::new(sst_region_metadata());
let handle = sst_file_handle(0, 1000);
let file_id = handle.file_id();
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);

// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
file_id,
metadata,
source,
storage: None,
create_inverted_index: false,
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: cache_manager.clone(),
};
let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};
let upload_path = sst_file_path(&data_home, file_id);
let index_upload_path = index_file_path(&data_home, file_id);
let upload_request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path: index_upload_path.clone(),
remote_store: mock_store.clone(),
};

let sst_info = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap()
.unwrap();
let write_parquet_metadata = sst_info.file_metadata.unwrap();

// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(Some(cache_manager.clone()));
let reader = builder.build().await.unwrap();

// Check parquet metadata
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
}
}
34 changes: 4 additions & 30 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

mod format;
pub(crate) mod helper;
mod metadata;
pub(crate) mod metadata;
mod page_reader;
pub mod reader;
pub mod row_group;
Expand Down Expand Up @@ -88,7 +88,8 @@ mod tests {
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -258,34 +259,7 @@ mod tests {
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();

// Because ParquetMetaData doesn't implement PartialEq,
// check all fields manually
macro_rules! assert_metadata {
( $writer:expr, $reader:expr, $($method:ident,)+ ) => {
$(
assert_eq!($writer.$method(), $reader.$method());
)+
}
}

assert_metadata!(
writer_metadata.file_metadata(),
reader_metadata.file_metadata(),
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);

assert_metadata!(
writer_metadata,
reader_metadata,
row_groups,
column_index,
offset_index,
);
assert_parquet_metadata_eq(writer_metadata, reader_metadata)
}

#[tokio::test]
Expand Down
12 changes: 6 additions & 6 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,15 @@ impl ParquetReaderBuilder {
file_path: &str,
file_size: u64,
) -> Result<Arc<ParquetMetaData>> {
let region_id = self.file_handle.region_id();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(metadata) = self.cache_manager.as_ref().and_then(|cache| {
cache.get_parquet_meta_data(self.file_handle.region_id(), self.file_handle.file_id())
}) {
return Ok(metadata);
if let Some(manager) = &self.cache_manager {
if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await {
return Ok(metadata);
}
}

// TODO(QuenKar): should also check write cache to get parquet metadata.

// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
Expand Down
Loading