Skip to content

refactor: rename components of the replace into impl #17355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use rand::prelude::SliceRandom;

use crate::io::BlockBuilder;
use crate::operations::mutation::SegmentIndex;
use crate::operations::replace_into::MergeIntoOperationAggregator;
use crate::operations::replace_into::ReplaceIntoOperationAggregator;
use crate::FuseTable;

impl FuseTable {
Expand Down Expand Up @@ -102,7 +102,7 @@ impl FuseTable {
let read_settings = ReadSettings::from_ctx(&ctx)?;
let mut items = Vec::with_capacity(num_partition);
for chunk_of_segment_locations in chunks {
let item = MergeIntoOperationAggregator::try_create(
let item = ReplaceIntoOperationAggregator::try_create(
ctx.clone(),
on_conflicts.clone(),
bloom_filter_column_indexes.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod merge_into_operation_meta;
mod replace_into_operation_meta;

pub use merge_into_operation_meta::*;
pub use replace_into_operation_meta::*;
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@ use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;

// This mod need to be refactored, since it not longer aiming to be
// used in the implementation of `MERGE INTO` statement in the future.
//
// unfortunately, distributed `replace-into` is being implemented in parallel,
// to avoid the potential heavy merge conflicts, the refactoring is postponed.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum MergeIntoOperation {
pub enum ReplaceIntoOperation {
Delete(Vec<DeletionByColumn>),
None,
}
Expand All @@ -43,8 +37,8 @@ pub struct DeletionByColumn {
pub bloom_hashes: Vec<RowBloomHashes>,
}

#[typetag::serde(name = "merge_into_operation_meta")]
impl BlockMetaInfo for MergeIntoOperation {
#[typetag::serde(name = "replace_into_operation_meta")]
impl BlockMetaInfo for ReplaceIntoOperation {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
Self::downcast_ref_from(info).is_some_and(|other| self == other)
}
Expand All @@ -54,16 +48,16 @@ impl BlockMetaInfo for MergeIntoOperation {
}
}

impl TryFrom<DataBlock> for MergeIntoOperation {
impl TryFrom<DataBlock> for ReplaceIntoOperation {
type Error = ErrorCode;

fn try_from(value: DataBlock) -> Result<Self, Self::Error> {
let meta = value.get_owned_meta().ok_or_else(|| {
ErrorCode::Internal(
"convert MergeIntoOperation from data block failed, no block meta found",
"convert ReplaceIntoOperation from data block failed, no block meta found",
)
})?;
MergeIntoOperation::downcast_from(meta).ok_or_else(|| {
ReplaceIntoOperation::downcast_from(meta).ok_or_else(|| {
ErrorCode::Internal(
"downcast block meta to MutationIntoOperation failed, type mismatch",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

mod column_hash;
mod deletion_accumulator;
mod merge_into_mutator;
mod mutator_replace_into;
mod replace_into_mutator;
mod replace_into_operation_agg;

pub use column_hash::row_hash_of_columns;
pub use deletion_accumulator::BlockDeletionKeys;
pub use deletion_accumulator::DeletionAccumulator;
pub use merge_into_mutator::MergeIntoOperationAggregator;
pub use mutator_replace_into::ReplaceIntoMutator;
pub use replace_into_mutator::ReplaceIntoMutator;
pub use replace_into_operation_agg::ReplaceIntoOperationAggregator;
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
use log::info;

use crate::operations::replace_into::meta::DeletionByColumn;
use crate::operations::replace_into::meta::MergeIntoOperation;
use crate::operations::replace_into::meta::ReplaceIntoOperation;
use crate::operations::replace_into::meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns;
use crate::operations::replace_into::mutator::column_hash::RowScalarValue;

// Replace is somehow a simplified merge_into, which
// - do insertion for "matched" branch
// - update for "not-matched" branch (by sending MergeIntoOperation to downstream)
// - update for "not-matched" branch (by sending ReplaceIntoOperation to downstream)
pub struct ReplaceIntoMutator {
on_conflict_fields: Vec<OnConflictField>,
table_range_index: HashMap<ColumnId, ColumnStatistics>,
Expand Down Expand Up @@ -100,7 +100,7 @@ enum ColumnHash {
}

impl ReplaceIntoMutator {
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<ReplaceIntoOperation> {
// pruning rows by using table level range index
// rows that definitely have no conflict will be removed
metrics_inc_replace_original_row_number(data_block.num_rows() as u64);
Expand All @@ -111,10 +111,10 @@ impl ReplaceIntoMutator {

if row_number_after_pruning == 0 {
info!("(replace-into) all rows are append-only");
return Ok(MergeIntoOperation::None);
return Ok(ReplaceIntoOperation::None);
}

let merge_into_operation = if let Some(partitioner) = &self.partitioner {
let replace_into_operation = if let Some(partitioner) = &self.partitioner {
// if table has cluster keys; we partition the input data block by left most column of cluster keys
let partitions = partitioner.partition(data_block)?;
metrics_inc_replace_partition_number(partitions.len() as u64);
Expand All @@ -137,12 +137,12 @@ impl ReplaceIntoMutator {
}
})
.collect();
MergeIntoOperation::Delete(vs)
ReplaceIntoOperation::Delete(vs)
} else {
// otherwise, we just build a single delete action
self.build_merge_into_operation(&data_block_may_have_conflicts)?
self.build_replace_into_operation(&data_block_may_have_conflicts)?
};
Ok(merge_into_operation)
Ok(replace_into_operation)
}

// filter out rows that definitely have no conflict, by using table level range index
Expand Down Expand Up @@ -171,7 +171,10 @@ impl ReplaceIntoMutator {
data_block.clone().filter_with_bitmap(&bitmap)
}

fn build_merge_into_operation(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
fn build_replace_into_operation(
&mut self,
data_block: &DataBlock,
) -> Result<ReplaceIntoOperation> {
let num_rows = data_block.num_rows();
let column_values = on_conflict_key_column_values(&self.on_conflict_fields, data_block);

Expand All @@ -183,7 +186,7 @@ impl ReplaceIntoMutator {
key_hashes,
bloom_hashes: vec![],
};
Ok(MergeIntoOperation::Delete(vec![delete_action]))
Ok(ReplaceIntoOperation::Delete(vec![delete_action]))
}
ColumnHash::Conflict(conflict_row_idx) => {
let conflict_description = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::operations::mutation::BlockIndex;
use crate::operations::mutation::SegmentIndex;
use crate::operations::read_block;
use crate::operations::replace_into::meta::DeletionByColumn;
use crate::operations::replace_into::meta::MergeIntoOperation;
use crate::operations::replace_into::meta::ReplaceIntoOperation;
use crate::operations::replace_into::meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::row_hash_of_columns;
use crate::operations::replace_into::mutator::DeletionAccumulator;
Expand Down Expand Up @@ -103,12 +103,12 @@ struct AggregationContext {
}

// Apply MergeIntoOperations to segments
pub struct MergeIntoOperationAggregator {
pub struct ReplaceIntoOperationAggregator {
deletion_accumulator: DeletionAccumulator,
aggregation_ctx: Arc<AggregationContext>,
}

impl MergeIntoOperationAggregator {
impl ReplaceIntoOperationAggregator {
#[allow(clippy::too_many_arguments)] // TODO fix this
pub fn try_create(
ctx: Arc<dyn TableContext>,
Expand Down Expand Up @@ -215,15 +215,15 @@ impl MergeIntoOperationAggregator {
}

// aggregate mutations (currently, deletion only)
impl MergeIntoOperationAggregator {
impl ReplaceIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn accumulate(&mut self, merge_into_operation: MergeIntoOperation) -> Result<()> {
pub async fn accumulate(&mut self, replace_into_operation: ReplaceIntoOperation) -> Result<()> {
let aggregation_ctx = &self.aggregation_ctx;
metrics_inc_replace_number_accumulated_merge_action();

let start = Instant::now();
match merge_into_operation {
MergeIntoOperation::Delete(partitions) => {
match replace_into_operation {
ReplaceIntoOperation::Delete(partitions) => {
for (segment_index, (path, ver)) in &aggregation_ctx.segment_locations {
// segment level
let load_param = LoadParams {
Expand Down Expand Up @@ -279,7 +279,7 @@ impl MergeIntoOperationAggregator {
}
}
}
MergeIntoOperation::None => {}
ReplaceIntoOperation::None => {}
}

metrics_inc_replace_accumulated_merge_action_time_ms(start.elapsed().as_millis() as u64);
Expand All @@ -288,7 +288,7 @@ impl MergeIntoOperationAggregator {
}

// apply the mutations and generate mutation log
impl MergeIntoOperationAggregator {
impl ReplaceIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
metrics_inc_replace_number_apply_deletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mod processor_broadcast;
mod processor_replace_into;
mod processor_unbranched_replace_into;
mod transform_merge_into_mutation_aggregator;
mod transform_replace_into_mutation_aggregator;

pub use processor_broadcast::BroadcastProcessor;
pub use processor_replace_into::ReplaceIntoProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ pub struct ReplaceIntoProcessor {

// stage data blocks
input_port: Arc<InputPort>,
output_port_merge_into_action: Arc<OutputPort>,
output_port_replace_into_action: Arc<OutputPort>,
output_port_append_data: Arc<OutputPort>,

input_data: Option<DataBlock>,
output_data_merge_into_action: Option<DataBlock>,
output_data_replace_into_action: Option<DataBlock>,
output_data_append: Option<DataBlock>,

target_table_empty: bool,
Expand Down Expand Up @@ -83,16 +83,16 @@ impl ReplaceIntoProcessor {
table_range_idx,
)?;
let input_port = InputPort::create();
let output_port_merge_into_action = OutputPort::create();
let output_port_replace_into_action = OutputPort::create();
let output_port_append_data = OutputPort::create();

Ok(Self {
replace_into_mutator,
input_port,
output_port_merge_into_action,
output_port_replace_into_action,
output_port_append_data,
input_data: None,
output_data_merge_into_action: None,
output_data_replace_into_action: None,
output_data_append: None,
target_table_empty,
delete_when,
Expand All @@ -109,12 +109,12 @@ impl ReplaceIntoProcessor {
#[allow(dead_code)]
pub fn into_pipe_item(self) -> PipeItem {
let input = self.input_port.clone();
let output_port_merge_into_action = self.output_port_merge_into_action.clone();
let output_port_replace_into_action = self.output_port_replace_into_action.clone();
let output_port_append_data = self.output_port_append_data.clone();
let processor_ptr = ProcessorPtr::create(Box::new(self));
PipeItem::create(processor_ptr, vec![input], vec![
output_port_append_data,
output_port_merge_into_action,
output_port_replace_into_action,
])
}
}
Expand All @@ -131,10 +131,10 @@ impl Processor for ReplaceIntoProcessor {
fn event(&mut self) -> Result<Event> {
let finished = self.input_port.is_finished()
&& self.output_data_append.is_none()
&& self.output_data_merge_into_action.is_none();
&& self.output_data_replace_into_action.is_none();

if finished {
self.output_port_merge_into_action.finish();
self.output_port_replace_into_action.finish();
self.output_port_append_data.finish();
return Ok(Event::Finished);
}
Expand All @@ -147,9 +147,9 @@ impl Processor for ReplaceIntoProcessor {
}
}

if self.output_port_merge_into_action.can_push() {
if let Some(data) = self.output_data_merge_into_action.take() {
self.output_port_merge_into_action.push_data(Ok(data));
if self.output_port_replace_into_action.can_push() {
if let Some(data) = self.output_data_replace_into_action.take() {
self.output_port_replace_into_action.push_data(Ok(data));
pushed_something = true;
}
}
Expand All @@ -162,7 +162,8 @@ impl Processor for ReplaceIntoProcessor {
}

if self.input_port.has_data() {
if self.output_data_append.is_none() && self.output_data_merge_into_action.is_none()
if self.output_data_append.is_none()
&& self.output_data_replace_into_action.is_none()
{
// no pending data (being sent to down streams)
self.input_data = Some(self.input_port.pull_data().unwrap()?);
Expand Down Expand Up @@ -207,12 +208,12 @@ impl Processor for ReplaceIntoProcessor {
.collect::<HashSet<_>>();
data_block = data_block.project(&projections);
};
let merge_into_action = self.replace_into_mutator.process_input_block(&data_block)?;
let replace_into_action = self.replace_into_mutator.process_input_block(&data_block)?;
metrics_inc_replace_process_input_block_time_ms(start.elapsed().as_millis() as u64);
metrics_inc_replace_block_number_input(1);
if !self.target_table_empty {
self.output_data_merge_into_action =
Some(DataBlock::empty_with_meta(Box::new(merge_into_action)));
self.output_data_replace_into_action =
Some(DataBlock::empty_with_meta(Box::new(replace_into_action)));
}

if all_delete {
Expand Down
Loading
Loading