Skip to content

Commit e215851

Browse files
authored
refactor: unify flush and compaction to always use FlatSource (#7799)
* feat: support write flat as primary key format Signed-off-by: evenyag <realevenyag@gmail.com> * feat: migrate flush to always use FlatSource Add FormatType propagation in SstWriteRequest and use it to choose Flat vs PrimaryKey write paths (write_all_flat vs write_all_flat_as_primary_key) in AccessLayer and WriteCache. Make compactor and flush derive the sst_write_format from region options or engine config. Simplify flush logic and remove the old memtable_source helper. Update tests to set default sst_write_format. Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: compaction use flat source Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: read parquet sequentially as flat batches Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: remove new_batch_with_binary in favor of new_record_batch_with_binary Replace PrimaryKeyWriteFormat with FlatWriteFormat in test_read_large_binary test and use new_record_batch_with_binary directly, removing the now-unused new_batch_with_binary function and its BinaryArray import. Signed-off-by: evenyag <realevenyag@gmail.com> * test: add tests for PrimaryKeyWriteFormat::convert_flat_batch Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: remove Either from SstWriteRequest Signed-off-by: evenyag <realevenyag@gmail.com> * fix: handle index build mode Signed-off-by: evenyag <realevenyag@gmail.com> * fix: consider sparse encoding and last non null in flush Signed-off-by: evenyag <realevenyag@gmail.com> * test: add unit tests for field_column_start edge cases Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
1 parent 74ff5c3 commit e215851

File tree

17 files changed

+656
-892
lines changed

17 files changed

+656
-892
lines changed

src/cmd/src/datanode/objbench.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ use clap::Parser;
2020
use colored::Colorize;
2121
use datanode::config::RegionEngineConfig;
2222
use datanode::store;
23-
use either::Either;
23+
use futures::stream;
2424
use mito2::access_layer::{
2525
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
2626
};
2727
use mito2::cache::{CacheManager, CacheManagerRef};
2828
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29-
use mito2::read::Source;
29+
use mito2::read::FlatSource;
30+
use mito2::sst::FormatType;
3031
use mito2::sst::file::{FileHandle, FileMeta};
3132
use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
3233
use mito2::sst::index::intermediate::IntermediateManager;
@@ -210,6 +211,7 @@ impl ObjbenchCommand {
210211
object_store.clone(),
211212
)
212213
.expected_metadata(Some(region_meta.clone()))
214+
.flat_format(true)
213215
.build()
214216
.await
215217
.map_err(|e| {
@@ -231,6 +233,10 @@ impl ObjbenchCommand {
231233
let reader_build_elapsed = reader_build_start.elapsed();
232234
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
233235
println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
236+
let reader_stream = Box::pin(stream::try_unfold(reader, |mut reader| async move {
237+
let batch = reader.next_record_batch().await?;
238+
Ok(batch.map(|batch| (batch, reader)))
239+
}));
234240

235241
// Build write request
236242
let fulltext_index_config = FulltextIndexConfig {
@@ -241,10 +247,11 @@ impl ObjbenchCommand {
241247
let write_req = SstWriteRequest {
242248
op_type: OperationType::Flush,
243249
metadata: region_meta,
244-
source: Either::Left(Source::Reader(Box::new(reader))),
250+
source: FlatSource::Stream(reader_stream),
245251
cache_manager,
246252
storage: None,
247253
max_sequence: None,
254+
sst_write_format: FormatType::PrimaryKey,
248255
index_options: Default::default(),
249256
index_config: mito_engine_config.index.clone(),
250257
inverted_index_config: MitoConfig::default().inverted_index,

src/mito2/src/access_layer.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::time::{Duration, Instant};
1717

1818
use async_stream::try_stream;
1919
use common_time::Timestamp;
20-
use either::Either;
2120
use futures::{Stream, TryStreamExt};
2221
use object_store::services::Fs;
2322
use object_store::util::{join_dir, with_instrument_layers};
@@ -37,7 +36,7 @@ use crate::error::{
3736
CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result,
3837
};
3938
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
40-
use crate::read::{FlatSource, Source};
39+
use crate::read::FlatSource;
4140
use crate::region::options::IndexOptions;
4241
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
4342
use crate::sst::index::IndexerBuilderImpl;
@@ -47,7 +46,7 @@ use crate::sst::location::{self, region_dir_from_table_dir};
4746
use crate::sst::parquet::reader::ParquetReaderBuilder;
4847
use crate::sst::parquet::writer::ParquetWriter;
4948
use crate::sst::parquet::{SstInfo, WriteOptions};
50-
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
49+
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FormatType};
5150

5251
pub type AccessLayerRef = Arc<AccessLayer>;
5352
/// SST write results.
@@ -391,15 +390,19 @@ impl AccessLayer {
391390
)
392391
.await
393392
.with_file_cleaner(cleaner);
394-
match request.source {
395-
Either::Left(source) => {
393+
match request.sst_write_format {
394+
FormatType::PrimaryKey => {
396395
writer
397-
.write_all(source, request.max_sequence, write_opts)
396+
.write_all_flat_as_primary_key(
397+
request.source,
398+
request.max_sequence,
399+
write_opts,
400+
)
398401
.await?
399402
}
400-
Either::Right(flat_source) => {
403+
FormatType::Flat => {
401404
writer
402-
.write_all_flat(flat_source, request.max_sequence, write_opts)
405+
.write_all_flat(request.source, request.max_sequence, write_opts)
403406
.await?
404407
}
405408
}
@@ -520,11 +523,12 @@ pub enum OperationType {
520523
pub struct SstWriteRequest {
521524
pub op_type: OperationType,
522525
pub metadata: RegionMetadataRef,
523-
pub source: Either<Source, FlatSource>,
526+
pub source: FlatSource,
524527
pub cache_manager: CacheManagerRef,
525528
#[allow(dead_code)]
526529
pub storage: Option<String>,
527530
pub max_sequence: Option<SequenceNumber>,
531+
pub sst_write_format: FormatType,
528532

529533
/// Configs for index
530534
pub index_options: IndexOptions,

src/mito2/src/cache/write_cache.rs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -244,15 +244,19 @@ impl WriteCache {
244244
.await
245245
.with_file_cleaner(cleaner);
246246

247-
let sst_info = match write_request.source {
248-
either::Left(source) => {
247+
let sst_info = match write_request.sst_write_format {
248+
crate::sst::FormatType::PrimaryKey => {
249249
writer
250-
.write_all(source, write_request.max_sequence, write_opts)
250+
.write_all_flat_as_primary_key(
251+
write_request.source,
252+
write_request.max_sequence,
253+
write_opts,
254+
)
251255
.await?
252256
}
253-
either::Right(flat_source) => {
257+
crate::sst::FormatType::Flat => {
254258
writer
255-
.write_all_flat(flat_source, write_request.max_sequence, write_opts)
259+
.write_all_flat(write_request.source, write_request.max_sequence, write_opts)
256260
.await?
257261
}
258262
};
@@ -509,12 +513,13 @@ mod tests {
509513
use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
510514
use crate::cache::{CacheManager, CacheStrategy};
511515
use crate::error::InvalidBatchSnafu;
512-
use crate::read::Source;
516+
use crate::read::FlatSource;
513517
use crate::region::options::IndexOptions;
514518
use crate::sst::parquet::reader::ParquetReaderBuilder;
515519
use crate::test_util::TestEnv;
516520
use crate::test_util::sst_util::{
517-
new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata,
521+
new_flat_source_from_record_batches, new_record_batch_by_range,
522+
sst_file_handle_with_file_id, sst_region_metadata,
518523
};
519524

520525
#[tokio::test]
@@ -532,21 +537,22 @@ mod tests {
532537
.create_write_cache(local_store.clone(), ReadableSize::mb(10))
533538
.await;
534539

535-
// Create Source
540+
// Create source.
536541
let metadata = Arc::new(sst_region_metadata());
537542
let region_id = metadata.region_id;
538-
let source = new_source(&[
539-
new_batch_by_range(&["a", "d"], 0, 60),
540-
new_batch_by_range(&["b", "f"], 0, 40),
541-
new_batch_by_range(&["b", "h"], 100, 200),
543+
let source = new_flat_source_from_record_batches(vec![
544+
new_record_batch_by_range(&["a", "d"], 0, 60),
545+
new_record_batch_by_range(&["b", "f"], 0, 40),
546+
new_record_batch_by_range(&["b", "h"], 100, 200),
542547
]);
543548

544549
let write_request = SstWriteRequest {
545550
op_type: OperationType::Flush,
546551
metadata,
547-
source: either::Left(source),
552+
source,
548553
storage: None,
549554
max_sequence: None,
555+
sst_write_format: Default::default(),
550556
cache_manager: Default::default(),
551557
index_options: IndexOptions::default(),
552558
index_config: Default::default(),
@@ -636,19 +642,20 @@ mod tests {
636642
// Create source
637643
let metadata = Arc::new(sst_region_metadata());
638644

639-
let source = new_source(&[
640-
new_batch_by_range(&["a", "d"], 0, 60),
641-
new_batch_by_range(&["b", "f"], 0, 40),
642-
new_batch_by_range(&["b", "h"], 100, 200),
645+
let source = new_flat_source_from_record_batches(vec![
646+
new_record_batch_by_range(&["a", "d"], 0, 60),
647+
new_record_batch_by_range(&["b", "f"], 0, 40),
648+
new_record_batch_by_range(&["b", "h"], 100, 200),
643649
]);
644650

645651
// Write to local cache and upload sst to mock remote store
646652
let write_request = SstWriteRequest {
647653
op_type: OperationType::Flush,
648654
metadata,
649-
source: either::Left(source),
655+
source,
650656
storage: None,
651657
max_sequence: None,
658+
sst_write_format: Default::default(),
652659
cache_manager: cache_manager.clone(),
653660
index_options: IndexOptions::default(),
654661
index_config: Default::default(),
@@ -715,9 +722,9 @@ mod tests {
715722
let metadata = Arc::new(sst_region_metadata());
716723

717724
// Creates a source that can return an error to abort the writer.
718-
let source = Source::Iter(Box::new(
725+
let source = FlatSource::Iter(Box::new(
719726
[
720-
Ok(new_batch_by_range(&["a", "d"], 0, 60)),
727+
Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
721728
InvalidBatchSnafu {
722729
reason: "Abort the writer",
723730
}
@@ -730,9 +737,10 @@ mod tests {
730737
let write_request = SstWriteRequest {
731738
op_type: OperationType::Flush,
732739
metadata,
733-
source: either::Left(source),
740+
source,
734741
storage: None,
735742
max_sequence: None,
743+
sst_write_format: Default::default(),
736744
cache_manager: cache_manager.clone(),
737745
index_options: IndexOptions::default(),
738746
index_config: Default::default(),

src/mito2/src/compaction.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ use crate::error::{
5858
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
5959
};
6060
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61+
use crate::read::BoxedRecordBatchStream;
6162
use crate::read::projection::ProjectionMapper;
6263
use crate::read::scan_region::{PredicateGroup, ScanInput};
6364
use crate::read::seq_scan::SeqScan;
64-
use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
6565
use crate::region::options::{MergeMode, RegionOptions};
6666
use crate::region::version::VersionControlRef;
6767
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
@@ -828,7 +828,7 @@ pub struct SerializedCompactionOutput {
828828
output_time_range: Option<TimestampRange>,
829829
}
830830

831-
/// Builders to create [BoxedBatchReader] for compaction.
831+
/// Builders to create [BoxedRecordBatchStream] for compaction.
832832
struct CompactionSstReaderBuilder<'a> {
833833
metadata: RegionMetadataRef,
834834
sst_layer: AccessLayerRef,
@@ -841,24 +841,17 @@ struct CompactionSstReaderBuilder<'a> {
841841
}
842842

843843
impl CompactionSstReaderBuilder<'_> {
844-
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
845-
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
846-
let scan_input = self.build_scan_input(false)?.with_compaction(true);
847-
848-
SeqScan::new(scan_input).build_reader_for_compaction().await
849-
}
850-
851844
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
852845
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
853-
let scan_input = self.build_scan_input(true)?.with_compaction(true);
846+
let scan_input = self.build_scan_input()?.with_compaction(true);
854847

855848
SeqScan::new(scan_input)
856849
.build_flat_reader_for_compaction()
857850
.await
858851
}
859852

860-
fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
861-
let mapper = ProjectionMapper::all(&self.metadata, flat_format)?;
853+
fn build_scan_input(self) -> Result<ScanInput> {
854+
let mapper = ProjectionMapper::all(&self.metadata, true)?;
862855
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
863856
.with_files(self.inputs.to_vec())
864857
.with_append_mode(self.append_mode)
@@ -868,7 +861,7 @@ impl CompactionSstReaderBuilder<'_> {
868861
// We ignore file not found error during compaction.
869862
.with_ignore_file_not_found(true)
870863
.with_merge_mode(self.merge_mode)
871-
.with_flat_format(flat_format);
864+
.with_flat_format(true);
872865

873866
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
874867
// by converting time ranges into predicate.

src/mito2/src/compaction/compactor.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::error::{
4343
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
4444
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
4545
use crate::metrics;
46-
use crate::read::{FlatSource, Source};
46+
use crate::read::FlatSource;
4747
use crate::region::options::RegionOptions;
4848
use crate::region::version::VersionRef;
4949
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
@@ -356,13 +356,8 @@ impl DefaultCompactor {
356356
time_range: output.output_time_range,
357357
merge_mode,
358358
};
359-
let source = if flat_format {
360-
let reader = builder.build_flat_sst_reader().await?;
361-
Either::Right(FlatSource::Stream(reader))
362-
} else {
363-
let reader = builder.build_sst_reader().await?;
364-
Either::Left(Source::Reader(reader))
365-
};
359+
let reader = builder.build_flat_sst_reader().await?;
360+
let source = FlatSource::Stream(reader);
366361
let mut metrics = Metrics::new(WriteType::Compaction);
367362
let region_metadata = compaction_region.region_metadata.clone();
368363
let sst_infos = compaction_region
@@ -375,6 +370,11 @@ impl DefaultCompactor {
375370
cache_manager: compaction_region.cache_manager.clone(),
376371
storage,
377372
max_sequence: max_sequence.map(NonZero::get),
373+
sst_write_format: if flat_format {
374+
FormatType::Flat
375+
} else {
376+
FormatType::PrimaryKey
377+
},
378378
index_options,
379379
index_config,
380380
inverted_index_config,

0 commit comments

Comments
 (0)