Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use store_api::metric_engine_consts::{
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
use store_api::storage::RegionId;

use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
Expand Down Expand Up @@ -363,6 +363,12 @@ impl MetricEngineInner {
column: DATA_SCHEMA_TSID_COLUMN_NAME,
}
);
ensure!(
!name_to_index.contains_key(PRIMARY_KEY_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: PRIMARY_KEY_COLUMN_NAME,
}
);
Comment thread
WenyXu marked this conversation as resolved.
Outdated

// check if required table option is present
ensure!(
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl MetricEngineState {
physical_region_id: RegionId,
logical_region_id: RegionId,
) {
let physical_region_id: RegionId = to_data_region_id(physical_region_id);
let physical_region_id = to_data_region_id(physical_region_id);
self.physical_regions
.get_mut(&physical_region_id)
.unwrap()
Expand Down
18 changes: 7 additions & 11 deletions src/metric-engine/src/row_modifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{
DATA_SCHEMA_ENCODED_PRIMARY_KEY_COLUMN_NAME, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
DATA_SCHEMA_TSID_COLUMN_NAME,
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::consts::{ReservedColumnId, PRIMARY_KEY_COLUMN_NAME};
use store_api::storage::{ColumnId, TableId};

use crate::error::{EncodePrimaryKeySnafu, Result};
Expand All @@ -37,7 +36,7 @@ const TSID_HASH_SEED: u32 = 846793005;
/// A row modifier modifies [`Rows`].
///
/// - For [`PrimaryKeyEncoding::Sparse`] encoding,
/// it replaces the primary key columns with the encoded primary key column(`__encoded_primary_key`).
/// it replaces the primary key columns with the encoded primary key column(`__primary_key`).
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
Expand Down Expand Up @@ -66,7 +65,7 @@ impl RowModifier {
}

/// Modifies rows with sparse primary key encoding.
/// It replaces the primary key columns with the encoded primary key column(`__encoded_primary_key`).
/// It replaces the primary key columns with the encoded primary key column(`__primary_key`).
fn modify_rows_sparse(&self, mut iter: RowsIter, table_id: TableId) -> Result<Rows> {
let num_column = iter.rows.schema.len();
let num_primary_key_column = iter.index.num_primary_key_column;
Expand All @@ -77,7 +76,6 @@ impl RowModifier {
for mut iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
let mut values = Vec::with_capacity(num_output_column);
// TODO(weny): reserve the buffer.
buffer.clear();
let internal_columns = [
(
Expand Down Expand Up @@ -105,7 +103,7 @@ impl RowModifier {
// Update the schema
let mut schema = Vec::with_capacity(num_output_column);
schema.push(ColumnSchema {
column_name: DATA_SCHEMA_ENCODED_PRIMARY_KEY_COLUMN_NAME.to_string(),
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
Expand Down Expand Up @@ -149,9 +147,9 @@ impl RowModifier {
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
for (name, value) in iter.primary_keys_with_name() {
name.hash(&mut hasher);
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
}
}
Expand Down Expand Up @@ -309,8 +307,6 @@ impl RowIter<'_> {
}
}

// src/metric-engine/src/encoder.rs

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down Expand Up @@ -379,7 +375,7 @@ mod tests {

fn expected_sparse_schema() -> Vec<ColumnSchema> {
vec![ColumnSchema {
column_name: DATA_SCHEMA_ENCODED_PRIMARY_KEY_COLUMN_NAME.to_string(),
column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/row_converter/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SparsePrimaryKeyCodecInner {
label_field: SortField,
// Columns in primary key
//
// None means all unknown columns is primary key(label field).
// None means all unknown columns is primary key(`Self::label_field`).
columns: Option<HashSet<ColumnId>>,
}

Expand Down
1 change: 0 additions & 1 deletion src/store-api/src/metric_engine_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2;
/// Column name of internal column `__metric` that stores the original metric name
pub const DATA_SCHEMA_TABLE_ID_COLUMN_NAME: &str = "__table_id";
pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid";
pub const DATA_SCHEMA_ENCODED_PRIMARY_KEY_COLUMN_NAME: &str = "__encoded_primary_key";

pub const METADATA_REGION_SUBDIR: &str = "metadata";
pub const DATA_REGION_SUBDIR: &str = "data";
Expand Down