Skip to content

Commit c04f2d4

Browse files
committed
chore: rebase develop
1 parent b4e7dcb commit c04f2d4

File tree

4 files changed

+9
-153
lines changed

4 files changed

+9
-153
lines changed

config/datanode.example.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ tcp_nodelay = false
2727

2828
[compaction]
2929
compaction_after_flush = true
30-
max_inflight_task = 32
30+
max_inflight_task = 32

src/storage/src/compaction/picker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl<S: LogStore> Picker<CompactionRequestImpl<S>, CompactionTaskImpl<S>> for Si
5757
ctx: &PickerContext,
5858
req: &CompactionRequestImpl<S>,
5959
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
60-
let levels = &req.levels;
60+
let levels = &req.levels();
6161

6262
for level_num in 0..levels.level_num() {
6363
let level = levels.level(level_num as u8);

src/storage/src/sst.rs

Lines changed: 1 addition & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::error::Result;
3131
use crate::memtable::BoxedBatchIterator;
3232
use crate::read::{Batch, BoxedBatchReader};
3333
use crate::schema::ProjectedSchemaRef;
34-
use crate::sst::parquet::{ParquetReader, ParquetWriter, Source};
34+
use crate::sst::parquet::{ParquetReader, ParquetWriter};
3535

3636
/// Maximum level of SSTs.
3737
pub const MAX_LEVEL: u8 = 2;
@@ -444,102 +444,3 @@ mod tests {
444444
);
445445
}
446446
}
447-
448-
#[cfg(test)]
449-
mod tests {
450-
use std::collections::HashSet;
451-
452-
use super::*;
453-
454-
fn create_handle(name: &str, level: Level) -> FileHandle {
455-
FileHandle::new(FileMeta {
456-
file_name: name.to_string(),
457-
time_range: None,
458-
level,
459-
})
460-
}
461-
462-
#[test]
463-
fn test_level_metas_add_and_remove() {
464-
let metas = LevelMetas::new();
465-
let merged = metas.merge(
466-
vec![create_handle("a", 0), create_handle("b", 0)].into_iter(),
467-
vec![].into_iter(),
468-
);
469-
470-
assert_eq!(
471-
HashSet::from(["a".to_string(), "b".to_string()]),
472-
merged
473-
.level(0)
474-
.files()
475-
.map(|f| f.file_name().to_string())
476-
.collect()
477-
);
478-
479-
let merged1 = merged.merge(
480-
vec![create_handle("c", 1), create_handle("d", 1)].into_iter(),
481-
vec![].into_iter(),
482-
);
483-
assert_eq!(
484-
HashSet::from(["a".to_string(), "b".to_string()]),
485-
merged1
486-
.level(0)
487-
.files()
488-
.map(|f| f.file_name().to_string())
489-
.collect()
490-
);
491-
492-
assert_eq!(
493-
HashSet::from(["c".to_string(), "d".to_string()]),
494-
merged1
495-
.level(1)
496-
.files()
497-
.map(|f| f.file_name().to_string())
498-
.collect()
499-
);
500-
501-
let removed1 = merged1.merge(
502-
vec![].into_iter(),
503-
vec![create_handle("a", 0), create_handle("c", 0)].into_iter(),
504-
);
505-
assert_eq!(
506-
HashSet::from(["b".to_string()]),
507-
removed1
508-
.level(0)
509-
.files()
510-
.map(|f| f.file_name().to_string())
511-
.collect()
512-
);
513-
514-
assert_eq!(
515-
HashSet::from(["c".to_string(), "d".to_string()]),
516-
removed1
517-
.level(1)
518-
.files()
519-
.map(|f| f.file_name().to_string())
520-
.collect()
521-
);
522-
523-
let removed2 = removed1.merge(
524-
vec![].into_iter(),
525-
vec![create_handle("c", 1), create_handle("d", 1)].into_iter(),
526-
);
527-
assert_eq!(
528-
HashSet::from(["b".to_string()]),
529-
removed2
530-
.level(0)
531-
.files()
532-
.map(|f| f.file_name().to_string())
533-
.collect()
534-
);
535-
536-
assert_eq!(
537-
HashSet::new(),
538-
removed2
539-
.level(1)
540-
.files()
541-
.map(|f| f.file_name().to_string())
542-
.collect()
543-
);
544-
}
545-
}

src/storage/src/sst/parquet.rs

Lines changed: 6 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,9 @@ use parquet::file::properties::WriterProperties;
4545
use parquet::format::FileMetaData;
4646
use parquet::schema::types::SchemaDescriptor;
4747
use snafu::{OptionExt, ResultExt};
48-
use store_api::storage::ChunkReader;
4948
use table::predicate::Predicate;
5049
use tokio::io::BufReader;
5150

52-
use crate::chunk::ChunkReaderImpl;
5351
use crate::error::{
5452
self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu,
5553
Result, WriteObjectSnafu, WriteParquetSnafu,
@@ -320,7 +318,6 @@ impl<'a> ParquetReader<'a> {
320318
// checks if converting time range unit into ts col unit will result into rounding error.
321319
if time_unit_lossy(&self.time_range, ts_col_unit) {
322320
let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new(
323-
ts_col_idx,
324321
self.time_range,
325322
projection,
326323
))]);
@@ -342,15 +339,9 @@ impl<'a> ParquetReader<'a> {
342339
.and_then(|s| s.convert_to(ts_col_unit))
343340
.map(|t| t.value()),
344341
) {
345-
Box::new(FastTimestampRowFilter::new(
346-
ts_col_idx, projection, lower, upper,
347-
)) as _
342+
Box::new(FastTimestampRowFilter::new(projection, lower, upper)) as _
348343
} else {
349-
Box::new(PlainTimestampRowFilter::new(
350-
ts_col_idx,
351-
self.time_range,
352-
projection,
353-
)) as _
344+
Box::new(PlainTimestampRowFilter::new(self.time_range, projection)) as _
354345
};
355346
let filter = RowFilter::new(vec![row_filter]);
356347
Some(filter)
@@ -371,21 +362,14 @@ fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool {
371362
/// `FastTimestampRowFilter` is used to filter rows within given timestamp range when reading
372363
/// row groups from parquet files, while avoids fetching all columns from SSTs file.
373364
struct FastTimestampRowFilter {
374-
timestamp_index: usize,
375365
lower_bound: i64,
376366
upper_bound: i64,
377367
projection: ProjectionMask,
378368
}
379369

380370
impl FastTimestampRowFilter {
381-
fn new(
382-
ts_col_idx: usize,
383-
projection: ProjectionMask,
384-
lower_bound: i64,
385-
upper_bound: i64,
386-
) -> Self {
371+
fn new(projection: ProjectionMask, lower_bound: i64, upper_bound: i64) -> Self {
387372
Self {
388-
timestamp_index: ts_col_idx,
389373
lower_bound,
390374
upper_bound,
391375
projection,
@@ -400,7 +384,7 @@ impl ArrowPredicate for FastTimestampRowFilter {
400384

401385
/// Selects the rows matching given time range.
402386
fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result<BooleanArray, ArrowError> {
403-
let ts_col = batch.column(self.timestamp_index);
387+
let ts_col = batch.column(0);
404388

405389
macro_rules! downcast_and_compute {
406390
($typ: ty) => {
@@ -442,15 +426,13 @@ impl ArrowPredicate for FastTimestampRowFilter {
442426
/// [PlainTimestampRowFilter] iterates each element in timestamp column, build a [Timestamp] struct
443427
/// and checks if given time range contains the timestamp.
444428
struct PlainTimestampRowFilter {
445-
timestamp_index: usize,
446429
time_range: TimestampRange,
447430
projection: ProjectionMask,
448431
}
449432

450433
impl PlainTimestampRowFilter {
451-
fn new(timestamp_index: usize, time_range: TimestampRange, projection: ProjectionMask) -> Self {
434+
fn new(time_range: TimestampRange, projection: ProjectionMask) -> Self {
452435
Self {
453-
timestamp_index,
454436
time_range,
455437
projection,
456438
}
@@ -463,7 +445,7 @@ impl ArrowPredicate for PlainTimestampRowFilter {
463445
}
464446

465447
fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result<BooleanArray, ArrowError> {
466-
let ts_col = batch.column(self.timestamp_index);
448+
let ts_col = batch.column(0);
467449

468450
macro_rules! downcast_and_compute {
469451
($array_ty: ty, $unit: ident) => {{
@@ -531,33 +513,6 @@ impl BatchReader for ChunkStream {
531513
}
532514
}
533515

534-
/// Parquet writer data source.
535-
pub enum Source {
536-
/// Writes rows from memtable to parquet
537-
Iter(BoxedBatchIterator),
538-
/// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet.
539-
Reader(ChunkReaderImpl),
540-
}
541-
542-
impl Source {
543-
async fn next_batch(&mut self) -> Result<Option<Batch>> {
544-
match self {
545-
Source::Iter(iter) => iter.next().transpose(),
546-
Source::Reader(reader) => reader
547-
.next_chunk()
548-
.await
549-
.map(|p| p.map(|chunk| Batch::new(chunk.columns))),
550-
}
551-
}
552-
553-
fn projected_schema(&self) -> ProjectedSchemaRef {
554-
match self {
555-
Source::Iter(iter) => iter.schema(),
556-
Source::Reader(reader) => reader.projected_schema().clone(),
557-
}
558-
}
559-
}
560-
561516
#[cfg(test)]
562517
mod tests {
563518
use std::sync::Arc;

0 commit comments

Comments
 (0)