Skip to content

chore(backport): Backport replace into refactoring to v1.2.636 #17408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: release/v1.2.636-rc8.3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ ignore = [
"RUSTSEC-2024-0421",
# gix-worktree-state nonexclusive checkout sets executable files world-writable
"RUSTSEC-2025-0001",
# `fast-float`: Segmentation fault due to lack of bound check
"RUSTSEC-2025-0003",
# ssl::select_next_proto use after free
"RUSTSEC-2025-0004",
]
11 changes: 11 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,15 @@ pub struct CacheConfig {
)]
pub block_meta_count: u64,

/// Max number of **segment** which all of its block meta will be cached.
/// Note that a segment may contain multiple block metadata entries.
#[clap(
long = "cache-segment-block-metas-count",
value_name = "VALUE",
default_value = "0"
)]
pub segment_block_metas_count: u64,

/// Max number of cached table statistic meta
#[clap(
long = "cache-table-meta-statistic-count",
Expand Down Expand Up @@ -2999,6 +3008,7 @@ mod cache_config_converters {
table_meta_snapshot_count: value.table_meta_snapshot_count,
table_meta_segment_bytes: value.table_meta_segment_bytes,
block_meta_count: value.block_meta_count,
segment_block_metas_count: value.segment_block_metas_count,
table_meta_statistic_count: value.table_meta_statistic_count,
enable_table_index_bloom: value.enable_table_bloom_index_cache,
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
Expand Down Expand Up @@ -3043,6 +3053,7 @@ mod cache_config_converters {
table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes,
table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio,
table_meta_segment_count: None,
segment_block_metas_count: value.segment_block_metas_count,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ pub struct CacheConfig {
/// Max number of cached table block meta
pub block_meta_count: u64,

/// Max number of **segment** which all of its block meta will be cached.
/// Note that a segment may contain multiple block metadata entries.
pub segment_block_metas_count: u64,

/// Max number of cached table segment
pub table_meta_statistic_count: u64,

Expand Down Expand Up @@ -683,6 +687,7 @@ impl Default for CacheConfig {
table_meta_snapshot_count: 256,
table_meta_segment_bytes: 1073741824,
block_meta_count: 0,
segment_block_metas_count: 0,
table_meta_statistic_count: 256,
enable_table_index_bloom: true,
table_bloom_index_meta_count: 3000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' |
| 'cache' | 'inverted_index_filter_size' | '2147483648' | '' |
| 'cache' | 'inverted_index_meta_count' | '3000' | '' |
| 'cache' | 'segment_block_metas_count' | '0' | '' |
| 'cache' | 'table_bloom_index_filter_count' | '0' | '' |
| 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' |
| 'cache' | 'table_bloom_index_meta_count' | '3000' | '' |
Expand Down
23 changes: 20 additions & 3 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ use crate::InMemoryLruCache;
/// In memory object cache of SegmentInfo
pub type CompactSegmentInfoCache = InMemoryLruCache<CompactSegmentInfo>;

pub type BlockMetaCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;
/// In-memory cache for all the block metadata of individual segments.
///
/// Note that this cache may be memory-intensive, as each item of this cache
/// contains ALL the BlockMeta of a segment, for well-compacted segment, the
/// number of BlockMeta might be 1000 ~ 2000.
pub type SegmentBlockMetasCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;

/// In-memory cache of individual BlockMeta.
pub type BlockMetaCache = InMemoryLruCache<BlockMeta>;

/// In memory object cache of TableSnapshot
pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
Expand Down Expand Up @@ -95,9 +103,9 @@ impl CachedObject<TableSnapshot> for TableSnapshot {
}

impl CachedObject<Vec<Arc<BlockMeta>>> for Vec<Arc<BlockMeta>> {
type Cache = BlockMetaCache;
type Cache = SegmentBlockMetasCache;
fn cache() -> Option<Self::Cache> {
CacheManager::instance().get_block_meta_cache()
CacheManager::instance().get_segment_block_metas_cache()
}
}

Expand Down Expand Up @@ -187,6 +195,15 @@ impl From<Vec<Arc<BlockMeta>>> for CacheValue<Vec<Arc<BlockMeta>>> {
}
}

impl From<BlockMeta> for CacheValue<BlockMeta> {
fn from(value: BlockMeta) -> Self {
CacheValue {
inner: Arc::new(value),
mem_bytes: 0,
}
}
}

impl From<TableSnapshot> for CacheValue<TableSnapshot> {
fn from(value: TableSnapshot) -> Self {
CacheValue {
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/common/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use cache::Unit;
pub use caches::BlockMetaCache;
pub use caches::CacheValue;
pub use caches::CachedObject;
pub use caches::SegmentBlockMetasCache;
pub use caches::SizedColumnArray;
pub use manager::CacheManager;
pub use providers::DiskCacheError;
Expand Down
21 changes: 20 additions & 1 deletion src/query/storages/common/cache/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::caches::FileMetaDataCache;
use crate::caches::InvertedIndexFileCache;
use crate::caches::InvertedIndexMetaCache;
use crate::caches::PrunePartitionsCache;
use crate::caches::SegmentBlockMetasCache;
use crate::caches::TableSnapshotCache;
use crate::caches::TableSnapshotStatisticCache;
use crate::InMemoryLruCache;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub struct CacheManager {
parquet_meta_data_cache: CacheSlot<FileMetaDataCache>,
table_data_cache: CacheSlot<TableDataCache>,
in_memory_table_data_cache: CacheSlot<ColumnArrayCache>,
segment_block_metas_cache: CacheSlot<SegmentBlockMetasCache>,
block_meta_cache: CacheSlot<BlockMetaCache>,
}

Expand Down Expand Up @@ -151,6 +153,7 @@ impl CacheManager {
table_statistic_cache: CacheSlot::new(None),
table_data_cache,
in_memory_table_data_cache,
segment_block_metas_cache: CacheSlot::new(None),
block_meta_cache: CacheSlot::new(None),
}));
} else {
Expand Down Expand Up @@ -201,8 +204,14 @@ impl CacheManager {
DEFAULT_PARQUET_META_DATA_CACHE_ITEMS,
);

let segment_block_metas_cache = Self::new_items_cache_slot(
MEMORY_CACHE_SEGMENT_BLOCK_METAS,
config.block_meta_count as usize,
);

let block_meta_cache = Self::new_items_cache_slot(
MEMORY_CACHE_BLOCK_META,
// TODO replace this config
config.block_meta_count as usize,
);

Expand All @@ -217,8 +226,9 @@ impl CacheManager {
table_statistic_cache,
table_data_cache,
in_memory_table_data_cache,
block_meta_cache,
segment_block_metas_cache,
parquet_meta_data_cache,
block_meta_cache,
}));
}

Expand Down Expand Up @@ -270,6 +280,9 @@ impl CacheManager {
MEMORY_CACHE_TABLE_SNAPSHOT => {
Self::set_items_capacity(&self.table_snapshot_cache, new_capacity, name);
}
MEMORY_CACHE_SEGMENT_BLOCK_METAS => {
Self::set_items_capacity(&self.segment_block_metas_cache, new_capacity, name);
}
MEMORY_CACHE_BLOCK_META => {
Self::set_items_capacity(&self.block_meta_cache, new_capacity, name);
}
Expand Down Expand Up @@ -311,6 +324,10 @@ impl CacheManager {
}
}

pub fn get_segment_block_metas_cache(&self) -> Option<SegmentBlockMetasCache> {
self.segment_block_metas_cache.get()
}

pub fn get_block_meta_cache(&self) -> Option<BlockMetaCache> {
self.block_meta_cache.get()
}
Expand Down Expand Up @@ -426,4 +443,6 @@ const MEMORY_CACHE_BLOOM_INDEX_FILTER: &str = "memory_cache_bloom_index_filter";
const MEMORY_CACHE_COMPACT_SEGMENT_INFO: &str = "memory_cache_compact_segment_info";
const MEMORY_CACHE_TABLE_STATISTICS: &str = "memory_cache_table_statistics";
const MEMORY_CACHE_TABLE_SNAPSHOT: &str = "memory_cache_table_snapshot";
const MEMORY_CACHE_SEGMENT_BLOCK_METAS: &str = "memory_cache_segment_block_metas";

const MEMORY_CACHE_BLOCK_META: &str = "memory_cache_block_meta";
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use rand::prelude::SliceRandom;
use crate::io::BlockBuilder;
use crate::io::ReadSettings;
use crate::operations::mutation::SegmentIndex;
use crate::operations::replace_into::MergeIntoOperationAggregator;
use crate::operations::replace_into::ReplaceIntoOperationAggregator;
use crate::FuseTable;

impl FuseTable {
Expand Down Expand Up @@ -102,7 +102,7 @@ impl FuseTable {
let read_settings = ReadSettings::from_ctx(&ctx)?;
let mut items = Vec::with_capacity(num_partition);
for chunk_of_segment_locations in chunks {
let item = MergeIntoOperationAggregator::try_create(
let item = ReplaceIntoOperationAggregator::try_create(
ctx.clone(),
on_conflicts.clone(),
bloom_filter_column_indexes.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod merge_into_operation_meta;
mod replace_into_operation_meta;

pub use merge_into_operation_meta::*;
pub use replace_into_operation_meta::*;
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@ use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;

// This mod need to be refactored, since it not longer aiming to be
// used in the implementation of `MERGE INTO` statement in the future.
//
// unfortunately, distributed `replace-into` is being implemented in parallel,
// to avoid the potential heavy merge conflicts, the refactoring is postponed.

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum MergeIntoOperation {
pub enum ReplaceIntoOperation {
Delete(Vec<DeletionByColumn>),
None,
}
Expand All @@ -43,8 +37,8 @@ pub struct DeletionByColumn {
pub bloom_hashes: Vec<RowBloomHashes>,
}

#[typetag::serde(name = "merge_into_operation_meta")]
impl BlockMetaInfo for MergeIntoOperation {
#[typetag::serde(name = "replace_into_operation_meta")]
impl BlockMetaInfo for ReplaceIntoOperation {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
Self::downcast_ref_from(info).is_some_and(|other| self == other)
}
Expand All @@ -54,16 +48,16 @@ impl BlockMetaInfo for MergeIntoOperation {
}
}

impl TryFrom<DataBlock> for MergeIntoOperation {
impl TryFrom<DataBlock> for ReplaceIntoOperation {
type Error = ErrorCode;

fn try_from(value: DataBlock) -> Result<Self, Self::Error> {
let meta = value.get_owned_meta().ok_or_else(|| {
ErrorCode::Internal(
"convert MergeIntoOperation from data block failed, no block meta found",
"convert ReplaceIntoOperation from data block failed, no block meta found",
)
})?;
MergeIntoOperation::downcast_from(meta).ok_or_else(|| {
ReplaceIntoOperation::downcast_from(meta).ok_or_else(|| {
ErrorCode::Internal(
"downcast block meta to MutationIntoOperation failed, type mismatch",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

mod column_hash;
mod deletion_accumulator;
mod merge_into_mutator;
mod mutator_replace_into;
mod replace_into_mutator;
mod replace_into_operation_agg;

pub use column_hash::row_hash_of_columns;
pub use deletion_accumulator::BlockDeletionKeys;
pub use deletion_accumulator::DeletionAccumulator;
pub use merge_into_mutator::MergeIntoOperationAggregator;
pub use mutator_replace_into::ReplaceIntoMutator;
pub use replace_into_mutator::ReplaceIntoMutator;
pub use replace_into_operation_agg::ReplaceIntoOperationAggregator;
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
use log::info;

use crate::operations::replace_into::meta::DeletionByColumn;
use crate::operations::replace_into::meta::MergeIntoOperation;
use crate::operations::replace_into::meta::ReplaceIntoOperation;
use crate::operations::replace_into::meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns;
use crate::operations::replace_into::mutator::column_hash::RowScalarValue;

// Replace is somehow a simplified merge_into, which
// - do insertion for "matched" branch
// - update for "not-matched" branch (by sending MergeIntoOperation to downstream)
// - update for "not-matched" branch (by sending ReplaceIntoOperation to downstream)
pub struct ReplaceIntoMutator {
on_conflict_fields: Vec<OnConflictField>,
table_range_index: HashMap<ColumnId, ColumnStatistics>,
Expand Down Expand Up @@ -100,7 +100,7 @@ enum ColumnHash {
}

impl ReplaceIntoMutator {
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result<ReplaceIntoOperation> {
// pruning rows by using table level range index
// rows that definitely have no conflict will be removed
metrics_inc_replace_original_row_number(data_block.num_rows() as u64);
Expand All @@ -111,10 +111,10 @@ impl ReplaceIntoMutator {

if row_number_after_pruning == 0 {
info!("(replace-into) all rows are append-only");
return Ok(MergeIntoOperation::None);
return Ok(ReplaceIntoOperation::None);
}

let merge_into_operation = if let Some(partitioner) = &self.partitioner {
let replace_into_operation = if let Some(partitioner) = &self.partitioner {
// if table has cluster keys; we partition the input data block by left most column of cluster keys
let partitions = partitioner.partition(data_block)?;
metrics_inc_replace_partition_number(partitions.len() as u64);
Expand All @@ -137,12 +137,12 @@ impl ReplaceIntoMutator {
}
})
.collect();
MergeIntoOperation::Delete(vs)
ReplaceIntoOperation::Delete(vs)
} else {
// otherwise, we just build a single delete action
self.build_merge_into_operation(&data_block_may_have_conflicts)?
self.build_replace_into_operation(&data_block_may_have_conflicts)?
};
Ok(merge_into_operation)
Ok(replace_into_operation)
}

// filter out rows that definitely have no conflict, by using table level range index
Expand Down Expand Up @@ -171,7 +171,10 @@ impl ReplaceIntoMutator {
data_block.clone().filter_with_bitmap(&bitmap)
}

fn build_merge_into_operation(&mut self, data_block: &DataBlock) -> Result<MergeIntoOperation> {
fn build_replace_into_operation(
&mut self,
data_block: &DataBlock,
) -> Result<ReplaceIntoOperation> {
let num_rows = data_block.num_rows();
let column_values = on_conflict_key_column_values(&self.on_conflict_fields, data_block);

Expand All @@ -183,7 +186,7 @@ impl ReplaceIntoMutator {
key_hashes,
bloom_hashes: vec![],
};
Ok(MergeIntoOperation::Delete(vec![delete_action]))
Ok(ReplaceIntoOperation::Delete(vec![delete_action]))
}
ColumnHash::Conflict(conflict_row_idx) => {
let conflict_description = {
Expand Down
Loading