Skip to content

Commit 9af9c02

Browse files
authored
feat: create table procedure for metric engine, part 1 (#2943)
* implementation Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * initialize Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * remove empty file Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * apply review sugg Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
1 parent 4383a69 commit 9af9c02

19 files changed

Lines changed: 158 additions & 48 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ frontend = { path = "src/frontend" }
169169
log-store = { path = "src/log-store" }
170170
meta-client = { path = "src/meta-client" }
171171
meta-srv = { path = "src/meta-srv" }
172+
metric-engine = { path = "src/metric-engine" }
172173
mito2 = { path = "src/mito2" }
173174
object-store = { path = "src/object-store" }
174175
operator = { path = "src/operator" }

src/common/catalog/src/consts.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub const INFORMATION_SCHEMA_COLUMN_STATISTICS_TABLE_ID: u32 = 7;
4545
4646
pub const MITO_ENGINE: &str = "mito";
4747
pub const MITO2_ENGINE: &str = "mito2";
48+
pub const METRIC_ENGINE: &str = "metric";
4849

4950
pub fn default_engine() -> &'static str {
5051
MITO_ENGINE

src/common/meta/src/ddl/create_table.rs

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ use api::v1::region::region_request::Body as PbRegionRequest;
1616
use api::v1::region::{
1717
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
1818
};
19-
use api::v1::{ColumnDef, SemanticType};
19+
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
2020
use async_trait::async_trait;
21+
use common_catalog::consts::METRIC_ENGINE;
2122
use common_error::ext::BoxedError;
2223
use common_procedure::error::{
2324
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
@@ -28,14 +29,15 @@ use common_telemetry::tracing_context::TracingContext;
2829
use futures::future::join_all;
2930
use serde::{Deserialize, Serialize};
3031
use snafu::{ensure, OptionExt, ResultExt};
32+
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
3133
use store_api::storage::RegionId;
3234
use strum::AsRefStr;
3335
use table::engine::TableReference;
3436
use table::metadata::{RawTableInfo, TableId};
3537

3638
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
3739
use crate::ddl::DdlContext;
38-
use crate::error::{self, Result};
40+
use crate::error::{self, Result, TableInfoNotFoundSnafu};
3941
use crate::key::table_name::TableNameKey;
4042
use crate::metrics;
4143
use crate::region_keeper::OperatingRegionGuard;
@@ -122,7 +124,7 @@ impl CreateTableProcedure {
122124
Ok(Status::executing(true))
123125
}
124126

125-
pub fn create_region_request_template(&self) -> Result<PbCreateRegionRequest> {
127+
pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
126128
let create_table_expr = &self.creator.data.task.create_table;
127129

128130
let column_defs = create_table_expr
@@ -172,14 +174,17 @@ impl CreateTableProcedure {
172174
})
173175
.collect::<Result<_>>()?;
174176

175-
Ok(PbCreateRegionRequest {
177+
let template = PbCreateRegionRequest {
176178
region_id: 0,
177179
engine: create_table_expr.engine.to_string(),
178180
column_defs,
179181
primary_key,
180182
path: String::new(),
181183
options: create_table_expr.table_options.clone(),
182-
})
184+
};
185+
186+
let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
187+
Ok(builder)
183188
}
184189

185190
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
@@ -194,7 +199,7 @@ impl CreateTableProcedure {
194199
let schema = &create_table_expr.schema_name;
195200
let storage_path = region_storage_path(catalog, schema);
196201

197-
let request_template = self.create_region_request_template()?;
202+
let mut request_builder = self.new_region_request_builder()?;
198203

199204
let leaders = find_leaders(region_routes);
200205
let mut create_region_tasks = Vec::with_capacity(leaders.len());
@@ -203,17 +208,20 @@ impl CreateTableProcedure {
203208
let requester = self.context.datanode_manager.datanode(&datanode).await;
204209

205210
let regions = find_leader_regions(region_routes, &datanode);
206-
let requests = regions
207-
.iter()
208-
.map(|region_number| {
209-
let region_id = RegionId::new(self.table_id(), *region_number);
210-
211-
let mut create_region_request = request_template.clone();
212-
create_region_request.region_id = region_id.as_u64();
213-
create_region_request.path = storage_path.clone();
214-
PbRegionRequest::Create(create_region_request)
215-
})
216-
.collect::<Vec<_>>();
211+
let mut requests = Vec::with_capacity(regions.len());
212+
for region_number in regions {
213+
let region_id = RegionId::new(self.table_id(), region_number);
214+
215+
let create_region_request = request_builder
216+
.build_one(
217+
&self.creator.data.task.create_table,
218+
region_id,
219+
storage_path.clone(),
220+
)
221+
.await?;
222+
223+
requests.push(PbRegionRequest::Create(create_region_request));
224+
}
217225

218226
for request in requests {
219227
let request = RegionRequest {
@@ -371,3 +379,82 @@ impl CreateTableData {
371379
self.task.table_ref()
372380
}
373381
}
382+
383+
/// Builder for [PbCreateRegionRequest].
384+
pub struct CreateRequestBuilder {
385+
context: DdlContext,
386+
template: PbCreateRegionRequest,
387+
/// Optional. Only for metric engine.
388+
physical_table_id: Option<TableId>,
389+
}
390+
391+
impl CreateRequestBuilder {
392+
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
393+
Self {
394+
context,
395+
template,
396+
physical_table_id: None,
397+
}
398+
}
399+
400+
pub fn template(&self) -> &PbCreateRegionRequest {
401+
&self.template
402+
}
403+
404+
async fn build_one(
405+
&mut self,
406+
create_expr: &CreateTableExpr,
407+
region_id: RegionId,
408+
storage_path: String,
409+
) -> Result<PbCreateRegionRequest> {
410+
let mut request = self.template.clone();
411+
412+
request.region_id = region_id.as_u64();
413+
request.path = storage_path;
414+
415+
if self.template.engine == METRIC_ENGINE {
416+
self.metric_engine_hook(create_expr, region_id, &mut request)
417+
.await?;
418+
}
419+
420+
Ok(request)
421+
}
422+
423+
async fn metric_engine_hook(
424+
&mut self,
425+
create_expr: &CreateTableExpr,
426+
region_id: RegionId,
427+
request: &mut PbCreateRegionRequest,
428+
) -> Result<()> {
429+
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
430+
let table_id = if let Some(table_id) = self.physical_table_id {
431+
table_id
432+
} else {
433+
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
434+
let table_name_key = TableNameKey::new(
435+
&create_expr.catalog_name,
436+
&create_expr.schema_name,
437+
physical_table_name,
438+
);
439+
let table_id = table_name_manager
440+
.get(table_name_key)
441+
.await?
442+
.context(TableInfoNotFoundSnafu {
443+
table_name: physical_table_name,
444+
})?
445+
.table_id();
446+
self.physical_table_id = Some(table_id);
447+
table_id
448+
};
449+
// Concat physical table's table id and corresponding region number to get
450+
// the physical region id.
451+
let physical_region_id = RegionId::new(table_id, region_id.region_number());
452+
request.options.insert(
453+
LOGICAL_TABLE_METADATA_KEY.to_string(),
454+
physical_region_id.as_u64().to_string(),
455+
);
456+
}
457+
458+
Ok(())
459+
}
460+
}

src/common/meta/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ pub enum Error {
4646
#[snafu(display("Invalid result with a txn response: {}", err_msg))]
4747
InvalidTxnResult { err_msg: String, location: Location },
4848

49+
#[snafu(display("Invalid engine type: {}", engine_type))]
50+
InvalidEngineType {
51+
engine_type: String,
52+
location: Location,
53+
},
54+
4955
#[snafu(display("Failed to connect to Etcd"))]
5056
ConnectEtcd {
5157
#[snafu(source)]
@@ -322,7 +328,9 @@ impl ErrorExt for Error {
322328
| RenameTable { .. }
323329
| Unsupported { .. } => StatusCode::Internal,
324330

325-
PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments,
331+
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
332+
StatusCode::InvalidArguments
333+
}
326334

327335
TableNotFound { .. } => StatusCode::TableNotFound,
328336
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

src/datanode/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ hyper = { version = "0.14", features = ["full"] }
4545
lazy_static.workspace = true
4646
log-store.workspace = true
4747
meta-client.workspace = true
48+
metric-engine.workspace = true
4849
mito2.workspace = true
4950
object-store.workspace = true
5051
pin-project = "1.0"

src/datanode/src/datanode.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use futures_util::future::try_join_all;
3434
use futures_util::StreamExt;
3535
use log_store::raft_engine::log_store::RaftEngineLogStore;
3636
use meta_client::client::MetaClient;
37+
use metric_engine::engine::MetricEngine;
3738
use mito2::engine::MitoEngine;
3839
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
3940
use object_store::util::normalize_dir;
@@ -471,12 +472,14 @@ impl DatanodeBuilder {
471472
for engine in &opts.region_engine {
472473
match engine {
473474
RegionEngineConfig::Mito(config) => {
474-
let engine: MitoEngine = MitoEngine::new(
475+
let mito_engine: MitoEngine = MitoEngine::new(
475476
config.clone(),
476477
log_store.clone(),
477478
object_store_manager.clone(),
478479
);
479-
engines.push(Arc::new(engine) as _);
480+
let metric_engine = MetricEngine::new(mito_engine.clone());
481+
engines.push(Arc::new(mito_engine) as _);
482+
engines.push(Arc::new(metric_engine) as _);
480483
}
481484
RegionEngineConfig::File(config) => {
482485
let engine = FileRegionEngine::new(

src/meta-srv/src/procedure/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ fn create_table_task() -> CreateTableTask {
9696
}
9797

9898
#[test]
99-
fn test_create_region_request_template() {
99+
fn test_region_request_builder() {
100100
let procedure = CreateTableProcedure::new(
101101
1,
102102
create_table_task(),
103103
test_data::new_region_routes(),
104104
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
105105
);
106106

107-
let template = procedure.create_region_request_template().unwrap();
107+
let template = procedure.new_region_request_builder().unwrap();
108108

109109
let expected = PbCreateRegionRequest {
110110
region_id: 0,
@@ -163,7 +163,7 @@ fn test_create_region_request_template() {
163163
path: String::new(),
164164
options: HashMap::new(),
165165
};
166-
assert_eq!(template, expected);
166+
assert_eq!(template.template(), &expected);
167167
}
168168

169169
async fn new_datanode_manager(

src/metric-engine/src/engine.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@ use common_query::Output;
2727
use common_recordbatch::SendableRecordBatchStream;
2828
use mito2::engine::MitoEngine;
2929
use store_api::metadata::RegionMetadataRef;
30+
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
3031
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
3132
use store_api::region_request::{AffectedRows, RegionRequest};
3233
use store_api::storage::{RegionId, ScanRequest};
3334
use tokio::sync::RwLock;
3435

3536
use self::state::MetricEngineState;
36-
use crate::consts::METRIC_ENGINE_NAME;
3737
use crate::data_region::DataRegion;
3838
use crate::metadata_region::MetadataRegion;
3939

40+
/// Fixed random state for generating tsid
41+
pub(crate) const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4);
42+
4043
#[cfg_attr(doc, aquamarine::aquamarine)]
4144
/// # Metric Engine
4245
///

src/metric-engine/src/engine/create.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ use mito2::engine::MITO_ENGINE_NAME;
2424
use object_store::util::join_dir;
2525
use snafu::{ensure, OptionExt, ResultExt};
2626
use store_api::metadata::ColumnMetadata;
27-
use store_api::region_engine::RegionEngine;
28-
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
29-
use store_api::storage::consts::ReservedColumnId;
30-
use store_api::storage::RegionId;
31-
32-
use crate::consts::{
27+
use store_api::metric_engine_consts::{
3328
DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
3429
LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, METADATA_SCHEMA_KEY_COLUMN_INDEX,
3530
METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX,
3631
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
3732
METADATA_SCHEMA_VALUE_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
3833
};
34+
use store_api::region_engine::RegionEngine;
35+
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
36+
use store_api::storage::consts::ReservedColumnId;
37+
use store_api::storage::RegionId;
38+
3939
use crate::engine::MetricEngineInner;
4040
use crate::error::{
4141
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
@@ -412,8 +412,9 @@ impl MetricEngineInner {
412412

413413
#[cfg(test)]
414414
mod test {
415+
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
416+
415417
use super::*;
416-
use crate::consts::METRIC_ENGINE_NAME;
417418
use crate::engine::MetricEngine;
418419
use crate::test_util::TestEnv;
419420

0 commit comments

Comments
 (0)