Skip to content

Commit 06f7eee

Browse files
committed
feat: track disk usage of regions
Signed-off-by: Zheming Li <nkdudu@126.com>
1 parent bd065ea commit 06f7eee

File tree

15 files changed

+177
-32
lines changed

15 files changed

+177
-32
lines changed

src/mito/src/table/test_util/mock_engine.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ impl Region for MockRegion {
196196
async fn close(&self) -> Result<()> {
197197
Ok(())
198198
}
199+
200+
fn disk_usage_bytes(&self) -> u64 {
201+
0
202+
}
199203
}
200204

201205
impl MockRegionInner {

src/storage/src/compaction/strategy.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ mod tests {
239239
Timestamp::new_millisecond(end_ts_millis),
240240
)),
241241
level: 0,
242+
file_size: 0,
242243
},
243244
layer,
244245
file_purger,

src/storage/src/compaction/task.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ impl CompactionOutput {
173173
let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated());
174174
let opts = WriteOptions {};
175175

176-
let SstInfo { time_range } = sst_layer
176+
let SstInfo {
177+
time_range,
178+
file_size,
179+
} = sst_layer
177180
.write_sst(&output_file_name, Source::Reader(reader), &opts)
178181
.await?;
179182

@@ -182,6 +185,7 @@ impl CompactionOutput {
182185
file_name: output_file_name,
183186
time_range,
184187
level: self.output_level,
188+
file_size,
185189
})
186190
}
187191
}

src/storage/src/compaction/writer.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ mod tests {
221221
let iter = memtable.iter(&IterContext::default()).unwrap();
222222
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
223223

224-
let SstInfo { time_range } = writer
224+
let SstInfo {
225+
time_range,
226+
file_size,
227+
} = writer
225228
.write_sst(&sst::WriteOptions::default())
226229
.await
227230
.unwrap();
@@ -231,6 +234,7 @@ mod tests {
231234
file_name: sst_file_name.to_string(),
232235
time_range,
233236
level: 0,
237+
file_size,
234238
},
235239
Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}),
236240
new_noop_file_purger(),
@@ -409,13 +413,11 @@ mod tests {
409413
.await
410414
.unwrap();
411415
assert_eq!(
412-
SstInfo {
413-
time_range: Some((
414-
Timestamp::new_millisecond(2000),
415-
Timestamp::new_millisecond(2000)
416-
)),
417-
},
418-
s1
416+
Some((
417+
Timestamp::new_millisecond(2000),
418+
Timestamp::new_millisecond(2000)
419+
)),
420+
s1.time_range,
419421
);
420422

421423
let s2 = ParquetWriter::new(
@@ -427,13 +429,11 @@ mod tests {
427429
.await
428430
.unwrap();
429431
assert_eq!(
430-
SstInfo {
431-
time_range: Some((
432-
Timestamp::new_millisecond(3000),
433-
Timestamp::new_millisecond(5002)
434-
)),
435-
},
436-
s2
432+
Some((
433+
Timestamp::new_millisecond(3000),
434+
Timestamp::new_millisecond(5002)
435+
)),
436+
s2.time_range,
437437
);
438438

439439
let s3 = ParquetWriter::new(
@@ -446,13 +446,11 @@ mod tests {
446446
.unwrap();
447447

448448
assert_eq!(
449-
SstInfo {
450-
time_range: Some((
451-
Timestamp::new_millisecond(6000),
452-
Timestamp::new_millisecond(8000)
453-
)),
454-
},
455-
s3
449+
Some((
450+
Timestamp::new_millisecond(6000),
451+
Timestamp::new_millisecond(8000)
452+
)),
453+
s3.time_range
456454
);
457455

458456
let output_files = ["o1.parquet", "o2.parquet", "o3.parquet"]
@@ -464,6 +462,7 @@ mod tests {
464462
file_name: f.to_string(),
465463
level: 1,
466464
time_range: None,
465+
file_size: 0,
467466
},
468467
Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}),
469468
new_noop_file_purger(),

src/storage/src/file_purger.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ mod tests {
143143
let iter = memtable.iter(&IterContext::default()).unwrap();
144144
let sst_path = "table1";
145145
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
146-
let _sst_info = layer
146+
let sst_info = layer
147147
.write_sst(sst_file_name, Source::Iter(iter), &WriteOptions {})
148148
.await
149149
.unwrap();
@@ -155,6 +155,7 @@ mod tests {
155155
file_name: sst_file_name.to_string(),
156156
time_range: None,
157157
level: 0,
158+
file_size: sst_info.file_size,
158159
},
159160
layer.clone(),
160161
file_purger,

src/storage/src/flush.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ impl<S: LogStore> FlushJob<S> {
203203
let sst_layer = self.sst_layer.clone();
204204

205205
futures.push(async move {
206-
let SstInfo { time_range } = sst_layer
206+
let SstInfo {
207+
time_range,
208+
file_size,
209+
} = sst_layer
207210
.write_sst(&file_name, Source::Iter(iter), &WriteOptions::default())
208211
.await?;
209212

@@ -212,6 +215,7 @@ impl<S: LogStore> FlushJob<S> {
212215
file_name,
213216
time_range,
214217
level: 0,
218+
file_size,
215219
})
216220
});
217221
}

src/storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ pub mod write_batch;
4141

4242
pub use engine::EngineImpl;
4343
mod file_purger;
44+
mod statistics_collector;

src/storage/src/manifest/test_utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use crate::metadata::RegionMetadata;
2020
use crate::sst::FileMeta;
2121
use crate::test_util::descriptor_util::RegionDescBuilder;
2222

23+
pub const DEFAULT_TEST_FILE_SIZE: u64 = 1024;
24+
2325
pub fn build_region_meta() -> RegionMetadata {
2426
let region_name = "region-0";
2527
let desc = RegionDescBuilder::new(region_name)
@@ -45,6 +47,7 @@ pub fn build_region_edit(
4547
file_name: f.to_string(),
4648
time_range: None,
4749
level: 0,
50+
file_size: DEFAULT_TEST_FILE_SIZE,
4851
})
4952
.collect(),
5053
files_to_remove: files_to_remove
@@ -54,6 +57,7 @@ pub fn build_region_edit(
5457
file_name: f.to_string(),
5558
time_range: None,
5659
level: 0,
60+
file_size: DEFAULT_TEST_FILE_SIZE,
5761
})
5862
.collect(),
5963
}

src/storage/src/region.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, Wri
4646
use crate::schema::compat::CompatWrite;
4747
use crate::snapshot::SnapshotImpl;
4848
use crate::sst::AccessLayerRef;
49+
use crate::statistics_collector::StatisticsCollectorRef;
4950
use crate::version::{
5051
Version, VersionControl, VersionControlRef, VersionEdit, INIT_COMMITTED_SEQUENCE,
5152
};
@@ -125,6 +126,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
125126
async fn close(&self) -> Result<()> {
126127
self.inner.close().await
127128
}
129+
130+
fn disk_usage_bytes(&self) -> u64 {
131+
self.inner.statistics_collector.disk_usage_bytes()
132+
}
128133
}
129134

130135
/// Storage related config for region.
@@ -190,6 +195,7 @@ impl<S: LogStore> RegionImpl<S> {
190195
let name = metadata.name().to_string();
191196
let version_control = VersionControl::with_version(version);
192197
let wal = Wal::new(id, store_config.log_store);
198+
let statistics_collector = StatisticsCollectorRef::default();
193199

194200
let inner = Arc::new(RegionInner {
195201
shared: Arc::new(SharedData {
@@ -201,13 +207,15 @@ impl<S: LogStore> RegionImpl<S> {
201207
store_config.memtable_builder,
202208
store_config.engine_config.clone(),
203209
store_config.ttl,
210+
statistics_collector.clone(),
204211
)),
205212
wal,
206213
flush_strategy: store_config.flush_strategy,
207214
flush_scheduler: store_config.flush_scheduler,
208215
compaction_scheduler: store_config.compaction_scheduler,
209216
sst_layer: store_config.sst_layer,
210217
manifest: store_config.manifest,
218+
statistics_collector,
211219
});
212220

213221
RegionImpl { inner }
@@ -221,12 +229,14 @@ impl<S: LogStore> RegionImpl<S> {
221229
store_config: StoreConfig<S>,
222230
_opts: &OpenOptions,
223231
) -> Result<Option<RegionImpl<S>>> {
232+
let statistics_collector = StatisticsCollectorRef::default();
224233
// Load version meta data from manifest.
225234
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
226235
&store_config.manifest,
227236
&store_config.memtable_builder,
228237
&store_config.sst_layer,
229238
&store_config.file_purger,
239+
&statistics_collector,
230240
)
231241
.await?
232242
{
@@ -281,6 +291,7 @@ impl<S: LogStore> RegionImpl<S> {
281291
store_config.memtable_builder,
282292
store_config.engine_config.clone(),
283293
store_config.ttl,
294+
statistics_collector.clone(),
284295
));
285296
let writer_ctx = WriterContext {
286297
shared: &shared,
@@ -306,6 +317,7 @@ impl<S: LogStore> RegionImpl<S> {
306317
compaction_scheduler: store_config.compaction_scheduler,
307318
sst_layer: store_config.sst_layer,
308319
manifest: store_config.manifest,
320+
statistics_collector,
309321
});
310322

311323
Ok(Some(RegionImpl { inner }))
@@ -321,6 +333,7 @@ impl<S: LogStore> RegionImpl<S> {
321333
memtable_builder: &MemtableBuilderRef,
322334
sst_layer: &AccessLayerRef,
323335
file_purger: &FilePurgerRef,
336+
statistics_collector: &StatisticsCollectorRef,
324337
) -> Result<(Option<Version>, RecoveredMetadataMap)> {
325338
let (start, end) = Self::manifest_scan_range();
326339
let mut iter = manifest.scan(start, end).await?;
@@ -352,7 +365,12 @@ impl<S: LogStore> RegionImpl<S> {
352365
file_purger.clone(),
353366
));
354367
for (manifest_version, action) in actions.drain(..) {
355-
version = Self::replay_edit(manifest_version, action, version);
368+
version = Self::replay_edit(
369+
manifest_version,
370+
action,
371+
version,
372+
statistics_collector,
373+
);
356374
}
357375
}
358376
(RegionMetaAction::Change(c), Some(v)) => {
@@ -365,7 +383,12 @@ impl<S: LogStore> RegionImpl<S> {
365383
version = None;
366384
}
367385
(action, Some(v)) => {
368-
version = Self::replay_edit(manifest_version, action, Some(v));
386+
version = Self::replay_edit(
387+
manifest_version,
388+
action,
389+
Some(v),
390+
statistics_collector,
391+
);
369392
}
370393
}
371394
}
@@ -391,6 +414,7 @@ impl<S: LogStore> RegionImpl<S> {
391414
manifest_version: ManifestVersion,
392415
action: RegionMetaAction,
393416
version: Option<Version>,
417+
statistics_collector: &StatisticsCollectorRef,
394418
) -> Option<Version> {
395419
if let RegionMetaAction::Edit(e) = action {
396420
let edit = VersionEdit {
@@ -400,6 +424,10 @@ impl<S: LogStore> RegionImpl<S> {
400424
manifest_version,
401425
max_memtable_id: None,
402426
};
427+
statistics_collector
428+
.increase_disk_usage_bytes(edit.files_to_add.iter().map(|f| f.file_size).sum());
429+
statistics_collector
430+
.descrease_disk_usage_bytes(edit.files_to_remove.iter().map(|f| f.file_size).sum());
403431
version.map(|mut v| {
404432
v.apply_edit(edit);
405433
v
@@ -483,6 +511,7 @@ struct RegionInner<S: LogStore> {
483511
compaction_scheduler: CompactionSchedulerRef<S>,
484512
sst_layer: AccessLayerRef,
485513
manifest: RegionManifest,
514+
statistics_collector: StatisticsCollectorRef,
486515
}
487516

488517
impl<S: LogStore> RegionInner<S> {

src/storage/src/region/tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,17 +297,20 @@ async fn test_recover_region_manifets() {
297297
SchedulerConfig::default(),
298298
NoopFilePurgeHandler,
299299
));
300+
let statistics_collector = StatisticsCollectorRef::default();
300301
// Recover from empty
301302
assert!(RegionImpl::<NoopLogStore>::recover_from_manifest(
302303
&manifest,
303304
&memtable_builder,
304305
&sst_layer,
305-
&file_purger
306+
&file_purger,
307+
&statistics_collector,
306308
)
307309
.await
308310
.unwrap()
309311
.0
310312
.is_none());
313+
assert_eq!(statistics_collector.disk_usage_bytes(), 0);
311314

312315
{
313316
// save some actions into region_meta
@@ -340,12 +343,14 @@ async fn test_recover_region_manifets() {
340343
.unwrap();
341344
}
342345

346+
let statistics_collector = StatisticsCollectorRef::default();
343347
// try to recover
344348
let (version, recovered_metadata) = RegionImpl::<NoopLogStore>::recover_from_manifest(
345349
&manifest,
346350
&memtable_builder,
347351
&sst_layer,
348352
&file_purger,
353+
&statistics_collector,
349354
)
350355
.await
351356
.unwrap();
@@ -368,4 +373,7 @@ async fn test_recover_region_manifets() {
368373

369374
// check manifest state
370375
assert_eq!(3, manifest.last_version());
376+
377+
// 3 files in total
378+
assert_eq!(statistics_collector.disk_usage_bytes(), 3 * DEFAULT_TEST_FILE_SIZE);
371379
}

0 commit comments

Comments
 (0)