Skip to content

Commit 782724b

Browse files
committed
feat(mito): support write cache for index file
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
1 parent fd8fb64 commit 782724b

17 files changed

Lines changed: 302 additions & 92 deletions

File tree

src/mito2/src/access_layer.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ impl AccessLayer {
102102
request: SstWriteRequest,
103103
write_opts: &WriteOptions,
104104
) -> Result<Option<SstInfo>> {
105-
let path = location::sst_file_path(&self.region_dir, request.file_id);
105+
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
106+
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
106107
let region_id = request.metadata.region_id;
107108

108109
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
@@ -114,15 +115,21 @@ impl AccessLayer {
114115
metadata: request.metadata,
115116
source: request.source,
116117
storage: request.storage,
117-
upload_path: path,
118+
upload_path: file_path,
119+
index_upload_path: index_file_path,
118120
remote_store: self.object_store.clone(),
119121
},
120122
write_opts,
121123
)
122124
.await?
123125
} else {
124126
// Write cache is disabled.
125-
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
127+
let mut writer = ParquetWriter::new(
128+
file_path,
129+
index_file_path,
130+
request.metadata,
131+
self.object_store.clone(),
132+
);
126133
writer.write_all(request.source, write_opts).await?
127134
};
128135

src/mito2/src/cache/file_cache.rs

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl FileCache {
6868
// The cache is replaced by another file. This is unexpected, we don't remove the same
6969
// file but updates the metrics as the file is already replaced by users.
7070
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
71-
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0);
71+
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
7272
return;
7373
}
7474

@@ -77,7 +77,7 @@ impl FileCache {
7777
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
7878
}
7979
Err(e) => {
80-
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0);
80+
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
8181
}
8282
}
8383
}
@@ -205,7 +205,51 @@ impl FileCache {
205205
}
206206

207207
/// Key of file cache index.
208-
pub(crate) type IndexKey = (RegionId, FileId);
208+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
209+
pub(crate) struct IndexKey {
210+
pub region_id: RegionId,
211+
pub file_id: FileId,
212+
pub file_type: FileType,
213+
}
214+
215+
impl IndexKey {
216+
/// Creates a new index key.
217+
pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
218+
IndexKey {
219+
region_id,
220+
file_id,
221+
file_type,
222+
}
223+
}
224+
}
225+
226+
/// Type of the file.
227+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
228+
pub(crate) enum FileType {
229+
/// Parquet file.
230+
Parquet,
231+
/// Puffin file.
232+
Puffin,
233+
}
234+
235+
impl FileType {
236+
/// Parses the file type from string.
237+
fn parse(s: &str) -> Option<FileType> {
238+
match s {
239+
"parquet" => Some(FileType::Parquet),
240+
"puffin" => Some(FileType::Puffin),
241+
_ => None,
242+
}
243+
}
244+
245+
/// Converts the file type to string.
246+
fn as_str(&self) -> &'static str {
247+
match self {
248+
FileType::Parquet => "parquet",
249+
FileType::Puffin => "puffin",
250+
}
251+
}
252+
}
209253

210254
/// An entity that describes the file in the file cache.
211255
///
@@ -220,19 +264,28 @@ pub(crate) struct IndexValue {
220264
///
221265
/// The file name format is `{region_id}.{file_id}`
222266
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
223-
join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1))
267+
join_path(
268+
cache_file_dir,
269+
&format!(
270+
"{}.{}.{}",
271+
key.region_id.as_u64(),
272+
key.file_id,
273+
key.file_type.as_str()
274+
),
275+
)
224276
}
225277

226278
/// Parse index key from the file name.
227279
fn parse_index_key(name: &str) -> Option<IndexKey> {
228-
let mut splited = name.splitn(2, '.');
229-
let region_id = splited.next().and_then(|s| {
280+
let mut split = name.splitn(3, '.');
281+
let region_id = split.next().and_then(|s| {
230282
let id = s.parse::<u64>().ok()?;
231283
Some(RegionId::from_u64(id))
232284
})?;
233-
let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?;
285+
let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
286+
let file_type = split.next().and_then(FileType::parse)?;
234287

235-
Some((region_id, file_id))
288+
Some(IndexKey::new(region_id, file_id, file_type))
236289
}
237290

238291
#[cfg(test)]
@@ -257,7 +310,7 @@ mod tests {
257310
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
258311
let region_id = RegionId::new(2000, 0);
259312
let file_id = FileId::random();
260-
let key = (region_id, file_id);
313+
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
261314
let file_path = cache.cache_file_path(key);
262315

263316
// Get an empty file.
@@ -270,7 +323,10 @@ mod tests {
270323
.unwrap();
271324
// Add to the cache.
272325
cache
273-
.put((region_id, file_id), IndexValue { file_size: 5 })
326+
.put(
327+
IndexKey::new(region_id, file_id, FileType::Parquet),
328+
IndexValue { file_size: 5 },
329+
)
274330
.await;
275331

276332
// Read file content.
@@ -303,7 +359,7 @@ mod tests {
303359
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
304360
let region_id = RegionId::new(2000, 0);
305361
let file_id = FileId::random();
306-
let key = (region_id, file_id);
362+
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
307363
let file_path = cache.cache_file_path(key);
308364

309365
// Write a file.
@@ -313,7 +369,10 @@ mod tests {
313369
.unwrap();
314370
// Add to the cache.
315371
cache
316-
.put((region_id, file_id), IndexValue { file_size: 5 })
372+
.put(
373+
IndexKey::new(region_id, file_id, FileType::Parquet),
374+
IndexValue { file_size: 5 },
375+
)
317376
.await;
318377

319378
// Remove the file but keep the index.
@@ -332,19 +391,20 @@ mod tests {
332391
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
333392

334393
let region_id = RegionId::new(2000, 0);
394+
let file_type = FileType::Parquet;
335395
// Write N files.
336396
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
337397
let mut total_size = 0;
338398
for (i, file_id) in file_ids.iter().enumerate() {
339-
let key = (region_id, *file_id);
399+
let key = IndexKey::new(region_id, *file_id, file_type);
340400
let file_path = cache.cache_file_path(key);
341401
let bytes = i.to_string().into_bytes();
342402
local_store.write(&file_path, bytes.clone()).await.unwrap();
343403

344404
// Add to the cache.
345405
cache
346406
.put(
347-
(region_id, *file_id),
407+
IndexKey::new(region_id, *file_id, file_type),
348408
IndexValue {
349409
file_size: bytes.len() as u32,
350410
},
@@ -356,15 +416,18 @@ mod tests {
356416
// Recover the cache.
357417
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
358418
// No entry before recovery.
359-
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
419+
assert!(cache
420+
.reader(IndexKey::new(region_id, file_ids[0], file_type))
421+
.await
422+
.is_none());
360423
cache.recover().await.unwrap();
361424

362425
// Check size.
363426
cache.memory_index.run_pending_tasks().await;
364427
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
365428

366429
for (i, file_id) in file_ids.iter().enumerate() {
367-
let key = (region_id, *file_id);
430+
let key = IndexKey::new(region_id, *file_id, file_type);
368431
let mut reader = cache.reader(key).await.unwrap();
369432
let mut buf = String::new();
370433
reader.read_to_string(&mut buf).await.unwrap();
@@ -376,12 +439,18 @@ mod tests {
376439
fn test_cache_file_path() {
377440
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
378441
assert_eq!(
379-
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
380-
cache_file_path("test_dir", (RegionId::new(1234, 5), file_id))
442+
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
443+
cache_file_path(
444+
"test_dir",
445+
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
446+
)
381447
);
382448
assert_eq!(
383-
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
384-
cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id))
449+
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
450+
cache_file_path(
451+
"test_dir/",
452+
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
453+
)
385454
);
386455
}
387456

@@ -390,8 +459,8 @@ mod tests {
390459
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
391460
let region_id = RegionId::new(1234, 5);
392461
assert_eq!(
393-
(region_id, file_id),
394-
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap()
462+
IndexKey::new(region_id, file_id, FileType::Parquet),
463+
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
395464
);
396465
assert!(parse_index_key("").is_none());
397466
assert!(parse_index_key(".").is_none());
@@ -400,8 +469,6 @@ mod tests {
400469
assert!(parse_index_key(".5299989643269").is_none());
401470
assert!(parse_index_key("5299989643269.").is_none());
402471
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
403-
assert!(
404-
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none()
405-
);
472+
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
406473
}
407474
}

0 commit comments

Comments
 (0)