From 4bbd34a4a9fb4fb87acc2de676d3a96cb5a442c1 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 16 Feb 2023 13:43:57 +0800 Subject: [PATCH 01/12] feat: Add table-procedures crate --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + src/table-procedures/Cargo.toml | 7 +++++++ src/table-procedures/src/create.rs | 15 +++++++++++++++ src/table-procedures/src/lib.rs | 17 +++++++++++++++++ 5 files changed, 47 insertions(+) create mode 100644 src/table-procedures/Cargo.toml create mode 100644 src/table-procedures/src/create.rs create mode 100644 src/table-procedures/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index aa2a385182ff..123d4735a0e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7473,6 +7473,13 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "table-procedures" +version = "0.1.0" +dependencies = [ + "common-procedure", +] + [[package]] name = "tagptr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d1f730d0a246..ba976bd5b756 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "src/storage", "src/store-api", "src/table", + "src/table-procedures", "tests-integration", "tests/runner", ] diff --git a/src/table-procedures/Cargo.toml b/src/table-procedures/Cargo.toml new file mode 100644 index 000000000000..464ebdebee7b --- /dev/null +++ b/src/table-procedures/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "table-procedures" +version = "0.1.0" +edition = "2021" + +[dependencies] +common-procedure = { path = "../common/procedure" } diff --git a/src/table-procedures/src/create.rs b/src/table-procedures/src/create.rs new file mode 100644 index 000000000000..a9c98a1534fc --- /dev/null +++ b/src/table-procedures/src/create.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedure to create a table. diff --git a/src/table-procedures/src/lib.rs b/src/table-procedures/src/lib.rs new file mode 100644 index 000000000000..a936770d9cfe --- /dev/null +++ b/src/table-procedures/src/lib.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedures for table operations. + +mod create; From 6705802a45d083686b65efbebe508e395ffdfb93 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 16 Feb 2023 19:11:08 +0800 Subject: [PATCH 02/12] feat: Implement procedure to create table --- Cargo.lock | 9 + src/table-procedures/Cargo.toml | 9 + src/table-procedures/src/create.rs | 299 +++++++++++++++++++++++++++++ src/table-procedures/src/error.rs | 80 ++++++++ src/table-procedures/src/lib.rs | 3 + src/table/src/requests.rs | 1 + 6 files changed, 401 insertions(+) create mode 100644 src/table-procedures/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 123d4735a0e5..3034c9e09740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7477,7 +7477,16 @@ dependencies = [ name = "table-procedures" version = "0.1.0" dependencies = [ + "async-trait", + "catalog", + "common-error", "common-procedure", + "common-telemetry", + "datatypes", + "serde", + "serde_json", + "snafu", + "table", ] [[package]] diff --git a/src/table-procedures/Cargo.toml b/src/table-procedures/Cargo.toml index 464ebdebee7b..ab594c7356d3 100644 --- a/src/table-procedures/Cargo.toml +++ b/src/table-procedures/Cargo.toml @@ -4,4 +4,13 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait.workspace = true +catalog = { path = "../catalog" } +common-error = { path = "../common/error" } common-procedure = { path = "../common/procedure" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +serde.workspace = true +serde_json.workspace = true +snafu.workspace = true +table = { path = "../table" } diff --git a/src/table-procedures/src/create.rs b/src/table-procedures/src/create.rs index a9c98a1534fc..e7e3a69ed9a9 100644 --- a/src/table-procedures/src/create.rs +++ b/src/table-procedures/src/create.rs @@ -13,3 +13,302 @@ // limitations under the License. //! Procedure to create a table. + +use async_trait::async_trait; +use catalog::{CatalogManagerRef, RegisterTableRequest}; +use common_procedure::{ + Context, Error, LockKey, Procedure, ProcedureId, ProcedureManagerRef, ProcedureState, + ProcedureWithId, Result, Status, +}; +use common_telemetry::logging; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; +use table::requests::CreateTableRequest; + +use crate::error::{ + AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu, + SerializeProcedureSnafu, +}; + +/// Procedure to create a table. +pub struct CreateTableProcedure { + data: CreateTableData, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, +} + +#[async_trait] +impl Procedure for CreateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &Context) -> Result { + match self.data.state { + CreateTableState::Prepare => self.on_prepare(), + CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await, + CreateTableState::RegisterCatalog => self.on_register_catalog().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + // We lock the whole table. + let table_name = self.data.table_ref().to_string(); + LockKey::single(table_name) + } +} + +impl CreateTableProcedure { + const TYPE_NAME: &str = "table-procedures::CreateTable"; + + /// Returns a new [CreateTableProcedure]. + pub fn new( + request: CreateTableRequest, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, + ) -> CreateTableProcedure { + CreateTableProcedure { + data: CreateTableData { + state: CreateTableState::Prepare, + request, + sub_procedure_id: None, + }, + catalog_manager, + table_engine, + engine_procedure, + } + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub fn register_loader( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + table_engine: TableEngineRef, + procedure_manager: ProcedureManagerRef, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json( + data, + catalog_manager.clone(), + table_engine.clone(), + engine_procedure.clone(), + ) + .map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json( + json: &str, + catalog_manager: CatalogManagerRef, + table_engine: TableEngineRef, + engine_procedure: TableEngineProcedureRef, + ) -> Result { + let data: CreateTableData = + serde_json::from_str(json).context(DeserializeProcedureSnafu)?; + + Ok(CreateTableProcedure { + data, + catalog_manager, + table_engine, + engine_procedure, + }) + } + + fn on_prepare(&mut self) -> Result { + // Check whether catalog and schema exist. + let catalog = self + .catalog_manager + .catalog(&self.data.request.catalog_name) + .context(AccessCatalogSnafu)? + .with_context(|| { + logging::error!( + "Failed to create table {}, catalog not found", + self.data.table_ref() + ); + CatalogNotFoundSnafu { + name: &self.data.request.catalog_name, + } + })?; + catalog + .schema(&self.data.request.schema_name) + .context(AccessCatalogSnafu)? + .with_context(|| { + logging::error!( + "Failed to create table {}, schema not found", + self.data.table_ref(), + ); + SchemaNotFoundSnafu { + name: &self.data.request.schema_name, + } + })?; + + self.data.state = CreateTableState::EngineCreateTable; + // Assign procedure id to the subprocedure. + self.data.sub_procedure_id = Some(ProcedureId::random()); + + Ok(Status::executing(true)) + } + + async fn on_engine_create_table(&mut self, ctx: &Context) -> Result { + // Safety: subprocedure id is always set in this state. + let sub_id = self.data.sub_procedure_id.unwrap(); + + // Check procedure state. + if let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? { + match sub_state { + ProcedureState::Running => Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }), + ProcedureState::Done => { + logging::info!( + "On engine create table {}, done, sub_id: {}", + self.data.request.table_name, + sub_id + ); + // The sub procedure is done, we can execute next step. + self.data.state = CreateTableState::RegisterCatalog; + Ok(Status::executing(true)) + } + ProcedureState::Failed => { + // If failed, try to create a new procedure to create table. + let engine_ctx = EngineContext::default(); + let procedure = self + .engine_procedure + .create_table_procedure(&engine_ctx, self.data.request.clone()) + .map_err(Error::external)?; + let sub_id = ProcedureId::random(); + // Store the procedure id. + self.data.sub_procedure_id = Some(sub_id); + + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: sub_id, + procedure, + }], + persist: true, + }) + } + } + } else { + logging::info!( + "On engine create table {}, not found, sub_id: {}", + self.data.request.table_name, + sub_id + ); + + // If the sub procedure is not found, we create a new sub procedure with the same id. + let engine_ctx = EngineContext::default(); + let procedure = self + .engine_procedure + .create_table_procedure(&engine_ctx, self.data.request.clone()) + .map_err(Error::external)?; + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: sub_id, + procedure, + }], + persist: true, + }) + } + } + + async fn on_register_catalog(&mut self) -> Result { + let catalog = self + .catalog_manager + .catalog(&self.data.request.catalog_name) + .context(AccessCatalogSnafu)? + .context(CatalogNotFoundSnafu { + name: &self.data.request.catalog_name, + })?; + let schema = catalog + .schema(&self.data.request.schema_name) + .context(AccessCatalogSnafu)? + .context(SchemaNotFoundSnafu { + name: &self.data.request.schema_name, + })?; + let table_exists = schema + .table(&self.data.request.table_name) + .map_err(Error::external)? + .is_some(); + if table_exists { + // Table already exists. + return Ok(Status::Done); + } + + let engine_ctx = EngineContext::default(); + let table_ref = self.data.table_ref(); + // Safety: The procedure owns the lock so the table should exist. + let table = self + .table_engine + .get_table(&engine_ctx, &table_ref) + .map_err(Error::external)? + .unwrap(); + + let register_req = RegisterTableRequest { + catalog: self.data.request.catalog_name.clone(), + schema: self.data.request.schema_name.clone(), + table_name: self.data.request.table_name.clone(), + table_id: self.data.request.id, + table, + }; + self.catalog_manager + .register_table(register_req) + .await + .map_err(Error::external)?; + + Ok(Status::Done) + } +} + +/// Represents each step while creating a table in the datanode. +#[derive(Debug, Serialize, Deserialize)] +enum CreateTableState { + /// Validate request and prepare to create table. + Prepare, + /// Create table in the table engine. + EngineCreateTable, + /// Register the table to the catalog. + RegisterCatalog, +} + +/// Serializable data of [CreateTableProcedure]. +#[derive(Debug, Serialize, Deserialize)] +struct CreateTableData { + /// Current state. + state: CreateTableState, + /// Request to create this table. + request: CreateTableRequest, + /// Id of the subprocedure to create this table in the engine. + /// + /// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable] + /// state. + sub_procedure_id: Option, +} + +impl CreateTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} diff --git a/src/table-procedures/src/error.rs b/src/table-procedures/src/error.rs new file mode 100644 index 000000000000..da30cf2dd9f5 --- /dev/null +++ b/src/table-procedures/src/error.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("Failed to serialize procedure to json, source: {}", source))] + SerializeProcedure { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to deserialize procedure from json, source: {}", source))] + DeserializeProcedure { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid raw schema, source: {}", source))] + InvalidRawSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to access catalog, source: {}", source))] + AccessCatalog { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + + #[snafu(display("Catalog {} not found", name))] + CatalogNotFound { name: String }, + + #[snafu(display("Schema {} not found", name))] + SchemaNotFound { name: String }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + SerializeProcedure { .. } | DeserializeProcedure { .. } => StatusCode::Internal, + InvalidRawSchema { source, .. } => source.status_code(), + AccessCatalog { source } => source.status_code(), + CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for common_procedure::Error { + fn from(e: Error) -> common_procedure::Error { + common_procedure::Error::external(e) + } +} diff --git a/src/table-procedures/src/lib.rs b/src/table-procedures/src/lib.rs index a936770d9cfe..1a57c5c6351a 100644 --- a/src/table-procedures/src/lib.rs +++ b/src/table-procedures/src/lib.rs @@ -15,3 +15,6 @@ //! Procedures for table operations. mod create; +pub mod error; + +pub use create::CreateTableProcedure; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index ec77815187fd..fd00c97c81bc 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -43,6 +43,7 @@ pub struct CreateDatabaseRequest { pub create_if_not_exists: bool, } +// TODO(yingwen): Add serialize/deserialize test. /// Create table request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateTableRequest { From 66ab296045c3d179a1d139798682271bf5e831a1 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 21 Feb 2023 14:05:04 +0800 Subject: [PATCH 03/12] feat: Integrate procedure manager to datanode --- Cargo.lock | 1 + src/cmd/src/datanode.rs | 11 ++++++++- src/datanode/Cargo.toml | 1 + src/datanode/src/datanode.rs | 23 ++++++++++++++++++ src/datanode/src/error.rs | 7 ++++++ src/datanode/src/instance.rs | 37 ++++++++++++++++++++++++++--- src/datanode/src/mock.rs | 4 +++- src/datanode/src/sql.rs | 17 +++++++++++-- src/datanode/src/tests/test_util.rs | 8 ++++++- 9 files changed, 101 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3034c9e09740..213010568e39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,6 +2186,7 @@ dependencies = [ "common-error", "common-grpc", "common-grpc-expr", + "common-procedure", "common-query", "common-recordbatch", "common-runtime", diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 1dc71d3313c9..05e70d8b414a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -14,7 +14,9 @@ use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig}; +use datanode::datanode::{ + Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, +}; use meta_client::MetaClientOptions; use servers::Mode; use snafu::ResultExt; @@ -65,6 +67,8 @@ struct StartCommand { data_dir: Option, #[clap(long)] wal_dir: Option, + #[clap(long)] + procedure_dir: Option, } impl StartCommand { @@ -134,6 +138,11 @@ impl TryFrom for DatanodeOptions { if let Some(wal_dir) = cmd.wal_dir { opts.wal.dir = wal_dir; } + + if let Some(procedure_dir) = cmd.procedure_dir { + opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir)); + } + Ok(opts) } } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 355da9c04685..b990d7cc002f 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -21,6 +21,7 @@ common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } +common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index daa8822d5c00..19d5d8a735ae 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -144,6 +144,27 @@ impl From<&DatanodeOptions> for StorageEngineConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ProcedureConfig { + /// Storage config for procedure manager. + pub store: ObjectStoreConfig, +} + +impl Default for ProcedureConfig { + fn default() -> ProcedureConfig { + ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string()) + } +} + +impl ProcedureConfig { + pub fn from_file_path(path: String) -> ProcedureConfig { + ProcedureConfig { + store: ObjectStoreConfig::File(FileConfig { data_dir: path }), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct DatanodeOptions { @@ -159,6 +180,7 @@ pub struct DatanodeOptions { pub wal: WalConfig, pub storage: ObjectStoreConfig, pub compaction: CompactionConfig, + pub procedure: Option, } impl Default for DatanodeOptions { @@ -176,6 +198,7 @@ impl Default for DatanodeOptions { wal: WalConfig::default(), storage: ObjectStoreConfig::default(), compaction: CompactionConfig::default(), + procedure: None, } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 4e571159ae98..27a19bde64cc 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -394,6 +394,12 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Failed to recover procedure, source: {}", source))] + RecoverProcedure { + #[snafu(backtrace)] + source: common_procedure::error::Error, + }, } pub type Result = std::result::Result; @@ -470,6 +476,7 @@ impl ErrorExt for Error { CopyTable { source, .. } => source.status_code(), TableScanExec { source, .. } => source.status_code(), UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, + RecoverProcedure { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 7cfc10b3b388..10e9532c8dbd 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -21,6 +21,8 @@ use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::readable_size::ReadableSize; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; use common_telemetry::logging::info; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; @@ -45,11 +47,11 @@ use table::table::TableIdProviderRef; use table::Table; use crate::datanode::{ - DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, + DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, }; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, - NewCatalogSnafu, OpenLogStoreSnafu, Result, + NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -105,6 +107,11 @@ impl Instance { object_store, )); + let procedure_manager = create_procedure_manager(&opts.procedure).await?; + if let Some(procedure_manager) = &procedure_manager { + table_engine.register_procedure_loaders(&**procedure_manager); + } + // create remote catalog manager let (catalog_manager, factory, table_id_provider) = match opts.mode { Mode::Standalone => { @@ -173,12 +180,23 @@ impl Instance { catalog_manager.clone(), )), }; + + // Recover procedures. + if let Some(procedure_manager) = &procedure_manager { + procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; + } + Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - table_engine, + table_engine.clone(), catalog_manager.clone(), query_engine.clone(), + table_engine, + procedure_manager, ), catalog_manager, script_executor, @@ -400,3 +418,16 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result, +) -> Result> { + let Some(procedure_config) = procedure_config else { + return Ok(None); + }; + + let object_store = new_object_store(&procedure_config.store).await?; + let manager_config = ManagerConfig { object_store }; + + Ok(Some(Arc::new(LocalManager::new(manager_config)))) +} diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index c54491f8009c..c083514bcaee 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -96,9 +96,11 @@ impl Instance { Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new( - table_engine, + table_engine.clone(), catalog_manager.clone(), query_engine.clone(), + table_engine, + None, ), catalog_manager, script_executor, diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 72b4342dee14..6651560cae9f 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -13,6 +13,7 @@ // limitations under the License. use catalog::CatalogManagerRef; +use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::error; use query::query_engine::QueryEngineRef; @@ -23,7 +24,7 @@ use sql::statements::delete::Delete; use sql::statements::describe::DescribeTable; use sql::statements::explain::Explain; use sql::statements::show::{ShowDatabases, ShowTables}; -use table::engine::{EngineContext, TableEngineRef, TableReference}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; use table::requests::*; use table::TableRef; @@ -57,6 +58,8 @@ pub struct SqlHandler { table_engine: TableEngineRef, catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: Option, } impl SqlHandler { @@ -64,11 +67,15 @@ impl SqlHandler { table_engine: TableEngineRef, catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: Option, ) -> Self { Self { table_engine, catalog_manager, query_engine, + engine_procedure, + procedure_manager, } } @@ -250,7 +257,13 @@ mod tests { let factory = QueryEngineFactory::new(catalog_list.clone()); let query_engine = factory.query_engine(); - let sql_handler = SqlHandler::new(table_engine, catalog_list.clone(), query_engine.clone()); + let sql_handler = SqlHandler::new( + table_engine.clone(), + catalog_list.clone(), + query_engine.clone(), + table_engine, + None, + ); let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() { QueryStatement::Sql(Statement::Insert(i)) => i, diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index a1d21b53dacd..71d085841b36 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -140,7 +140,13 @@ pub async fn create_mock_sql_handler() -> SqlHandler { let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let factory = QueryEngineFactory::new(catalog_list); - SqlHandler::new(mock_engine, catalog_manager, factory.query_engine()) + SqlHandler::new( + mock_engine.clone(), + catalog_manager, + factory.query_engine(), + mock_engine, + None, + ) } pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance { From f922fff465263001a89fcaf2f0e351fe16b0b4f4 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 21 Feb 2023 18:51:14 +0800 Subject: [PATCH 04/12] test: Test CreateTableProcedure --- src/table-procedures/Cargo.toml | 8 +++ src/table-procedures/src/create.rs | 85 ++++++++++++++++++++++++++- src/table-procedures/src/lib.rs | 2 + src/table-procedures/src/test_util.rs | 73 +++++++++++++++++++++++ 4 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 src/table-procedures/src/test_util.rs diff --git a/src/table-procedures/Cargo.toml b/src/table-procedures/Cargo.toml index ab594c7356d3..ba7c3ffe0f0e 100644 --- a/src/table-procedures/Cargo.toml +++ b/src/table-procedures/Cargo.toml @@ -14,3 +14,11 @@ serde.workspace = true serde_json.workspace = true snafu.workspace = true table = { path = "../table" } + +[dev-dependencies] +log-store = { path = "../log-store" } +mito = { path = "../mito" } +object-store = { path = "../object-store" } +storage = { path = "../storage" } +tokio.workspace = true +tempdir = "0.3" diff --git a/src/table-procedures/src/create.rs b/src/table-procedures/src/create.rs index e7e3a69ed9a9..c27d25948508 100644 --- a/src/table-procedures/src/create.rs +++ b/src/table-procedures/src/create.rs @@ -66,7 +66,7 @@ impl Procedure for CreateTableProcedure { } impl CreateTableProcedure { - const TYPE_NAME: &str = "table-procedures::CreateTable"; + const TYPE_NAME: &str = "table-procedures::CreateTableProcedure"; /// Returns a new [CreateTableProcedure]. pub fn new( @@ -312,3 +312,86 @@ impl CreateTableData { } } } + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, RawSchema}; + use table::engine::{EngineContext, TableEngine}; + + use super::*; + use crate::test_util::TestEnv; + + fn schema_for_test() -> RawSchema { + let column_schemas = vec![ + // Key + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + // Nullable value column: cpu + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + // Non-null value column: memory + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true), + ]; + + RawSchema::new(column_schemas) + } + + fn new_create_request(table_name: &str) -> CreateTableRequest { + CreateTableRequest { + id: 1, + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + desc: Some("a test table".to_string()), + schema: schema_for_test(), + region_numbers: vec![0, 1], + create_if_not_exists: true, + primary_key_indices: vec![0], + table_options: Default::default(), + } + } + + #[tokio::test] + async fn test_create_table_procedure() { + let TestEnv { + dir: _dir, + table_engine, + procedure_manager, + catalog_manager, + } = TestEnv::new("create"); + + let table_name = "test_create"; + let request = new_create_request(table_name); + let procedure = CreateTableProcedure::new( + request.clone(), + catalog_manager, + table_engine.clone(), + table_engine.clone(), + ); + + let table_ref = TableReference { + catalog: &request.catalog_name, + schema: &request.schema_name, + table: &request.table_name, + }; + let engine_ctx = EngineContext::default(); + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_none()); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); + watcher.changed().await.unwrap(); + + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_some()); + } +} diff --git a/src/table-procedures/src/lib.rs b/src/table-procedures/src/lib.rs index 1a57c5c6351a..fcfd60602468 100644 --- a/src/table-procedures/src/lib.rs +++ b/src/table-procedures/src/lib.rs @@ -16,5 +16,7 @@ mod create; pub mod error; +#[cfg(test)] +mod test_util; pub use create::CreateTableProcedure; diff --git a/src/table-procedures/src/test_util.rs b/src/table-procedures/src/test_util.rs new file mode 100644 index 000000000000..ab8a8669cfe5 --- /dev/null +++ b/src/table-procedures/src/test_util.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::local::MemoryCatalogManager; +use catalog::CatalogManagerRef; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; +use log_store::NoopLogStore; +use mito::config::EngineConfig; +use mito::engine::MitoEngine; +use object_store::services::fs::Builder; +use object_store::ObjectStore; +use storage::compaction::noop::NoopCompactionScheduler; +use storage::config::EngineConfig as StorageEngineConfig; +use storage::EngineImpl; +use tempdir::TempDir; + +pub struct TestEnv { + pub dir: TempDir, + pub table_engine: Arc>>, + pub procedure_manager: ProcedureManagerRef, + pub catalog_manager: CatalogManagerRef, +} + +impl TestEnv { + pub fn new(prefix: &str) -> TestEnv { + let dir = TempDir::new(prefix).unwrap(); + let store_dir = format!("{}/db", dir.path().to_string_lossy()); + let accessor = Builder::default().root(&store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor); + + let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let storage_engine = EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(NoopLogStore::default()), + object_store.clone(), + compaction_scheduler, + ); + let table_engine = Arc::new(MitoEngine::new( + EngineConfig::default(), + storage_engine, + object_store, + )); + + let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy()); + let accessor = Builder::default().root(&procedure_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor); + + let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store })); + + let catalog_manager = Arc::new(MemoryCatalogManager::default()); + + TestEnv { + dir, + table_engine, + procedure_manager, + catalog_manager, + } + } +} From 6676c932dbe7bef0252d1582be2bb6f3f7283f50 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 21 Feb 2023 18:55:04 +0800 Subject: [PATCH 05/12] refactor: Rename table-procedures to table-procedure --- Cargo.lock | 9 ++++++++- Cargo.toml | 2 +- src/datanode/Cargo.toml | 1 + src/{table-procedures => table-procedure}/Cargo.toml | 2 +- src/{table-procedures => table-procedure}/src/create.rs | 0 src/{table-procedures => table-procedure}/src/error.rs | 0 src/{table-procedures => table-procedure}/src/lib.rs | 0 .../src/test_util.rs | 0 8 files changed, 11 insertions(+), 3 deletions(-) rename src/{table-procedures => table-procedure}/Cargo.toml (95%) rename src/{table-procedures => table-procedure}/src/create.rs (100%) rename src/{table-procedures => table-procedure}/src/error.rs (100%) rename src/{table-procedures => table-procedure}/src/lib.rs (100%) rename src/{table-procedures => table-procedure}/src/test_util.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 213010568e39..a54d3b0ef791 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,6 +2219,7 @@ dependencies = [ "store-api", "substrait 0.1.0", "table", + "table-procedure", "tempdir", "tokio", "tokio-stream", @@ -7475,7 +7476,7 @@ dependencies = [ ] [[package]] -name = "table-procedures" +name = "table-procedure" version = "0.1.0" dependencies = [ "async-trait", @@ -7484,10 +7485,16 @@ dependencies = [ "common-procedure", "common-telemetry", "datatypes", + "log-store", + "mito", + "object-store", "serde", "serde_json", "snafu", + "storage", "table", + "tempdir", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ba976bd5b756..bab6f28f2dc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ members = [ "src/storage", "src/store-api", "src/table", - "src/table-procedures", + "src/table-procedure", "tests-integration", "tests/runner", ] diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b990d7cc002f..66f42a7963ba 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -53,6 +53,7 @@ storage = { path = "../storage" } store-api = { path = "../store-api" } substrait = { path = "../common/substrait" } table = { path = "../table" } +table-procedure = { path = "../table-procedure" } tokio.workspace = true tokio-stream = { version = "0.1", features = ["net"] } tonic.workspace = true diff --git a/src/table-procedures/Cargo.toml b/src/table-procedure/Cargo.toml similarity index 95% rename from src/table-procedures/Cargo.toml rename to src/table-procedure/Cargo.toml index ba7c3ffe0f0e..799aa8ee849a 100644 --- a/src/table-procedures/Cargo.toml +++ b/src/table-procedure/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "table-procedures" +name = "table-procedure" version = "0.1.0" edition = "2021" diff --git a/src/table-procedures/src/create.rs b/src/table-procedure/src/create.rs similarity index 100% rename from src/table-procedures/src/create.rs rename to src/table-procedure/src/create.rs diff --git a/src/table-procedures/src/error.rs b/src/table-procedure/src/error.rs similarity index 100% rename from src/table-procedures/src/error.rs rename to src/table-procedure/src/error.rs diff --git a/src/table-procedures/src/lib.rs b/src/table-procedure/src/lib.rs similarity index 100% rename from src/table-procedures/src/lib.rs rename to src/table-procedure/src/lib.rs diff --git a/src/table-procedures/src/test_util.rs b/src/table-procedure/src/test_util.rs similarity index 100% rename from src/table-procedures/src/test_util.rs rename to src/table-procedure/src/test_util.rs From 3cc1d62b9fddb906a970f467c1359903203f03a6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 21 Feb 2023 19:34:08 +0800 Subject: [PATCH 06/12] feat: Implement create_table_by_procedure --- src/common/procedure/src/local.rs | 2 -- src/datanode/src/error.rs | 21 ++++++++++++- src/datanode/src/sql/create.rs | 52 ++++++++++++++++++++++++++++--- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 7022231351af..94686a01aae2 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -123,8 +123,6 @@ pub(crate) struct ManagerContext { loaders: Mutex>, lock_map: LockMap, procedures: RwLock>, - // TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we - // should be able to remove the its message and all its child messages. /// Messages loaded from the procedure store. messages: Mutex>, } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 27a19bde64cc..32073cf79258 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -400,6 +400,21 @@ pub enum Error { #[snafu(backtrace)] source: common_procedure::error::Error, }, + + #[snafu(display("Failed to submit procedure, source: {}", source))] + SubmitProcedure { + #[snafu(backtrace)] + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to wait procedure done, source: {}", source))] + WaitProcedure { + source: tokio::sync::watch::error::RecvError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to execute procedure"))] + ProcedureExec {}, } pub type Result = std::result::Result; @@ -476,7 +491,11 @@ impl ErrorExt for Error { CopyTable { source, .. } => source.status_code(), TableScanExec { source, .. } => source.status_code(), UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - RecoverProcedure { source, .. } => source.status_code(), + RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => { + source.status_code() + } + WaitProcedure { .. } => StatusCode::Internal, + ProcedureExec { .. } => StatusCode::Internal, } } diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index bdbf1995acbd..7e57aad46ddb 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -15,9 +15,9 @@ use std::collections::HashMap; use catalog::{RegisterSchemaRequest, RegisterTableRequest}; +use common_procedure::{ProcedureManagerRef, ProcedureState, ProcedureWithId}; use common_query::Output; -use common_telemetry::tracing::info; -use common_telemetry::tracing::log::error; +use common_telemetry::tracing::{error, info}; use datatypes::schema::RawSchema; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -28,12 +28,13 @@ use store_api::storage::consts::TIME_INDEX_NAME; use table::engine::{EngineContext, TableReference}; use table::metadata::TableId; use table::requests::*; +use table_procedure::CreateTableProcedure; use crate::error::{ self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, - RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, - UnrecognizedTableOptionSnafu, + ProcedureExecSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, + SubmitProcedureSnafu, UnrecognizedTableOptionSnafu, WaitProcedureSnafu, }; use crate::sql::SqlHandler; @@ -72,6 +73,10 @@ impl SqlHandler { } pub(crate) async fn create_table(&self, req: CreateTableRequest) -> Result { + if let Some(procedure_manager) = &self.procedure_manager { + return self.create_table_by_procedure(procedure_manager, req).await; + } + let ctx = EngineContext {}; // first check if catalog and schema exist let catalog = self @@ -127,6 +132,45 @@ impl SqlHandler { Ok(Output::AffectedRows(0)) } + pub(crate) async fn create_table_by_procedure( + &self, + procedure_manager: &ProcedureManagerRef, + req: CreateTableRequest, + ) -> Result { + let table_name = req.table_name.clone(); + let procedure = CreateTableProcedure::new( + req, + self.catalog_manager.clone(), + self.table_engine.clone(), + self.engine_procedure.clone(), + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + info!( + "Create table {} by procedure {}", + table_name, procedure_with_id.id + ); + + let mut watcher = procedure_manager + .submit(procedure_with_id) + .await + .context(SubmitProcedureSnafu)?; + + // TODO(yingwen): Wrap this into a function and add error to ProcedureState::Failed. + loop { + watcher.changed().await.context(WaitProcedureSnafu)?; + match *watcher.borrow() { + ProcedureState::Running => (), + ProcedureState::Done => { + return Ok(Output::AffectedRows(0)); + } + ProcedureState::Failed => { + return ProcedureExecSnafu {}.fail(); + } + } + } + } + /// Converts [CreateTable] to [SqlRequest::CreateTable]. pub(crate) fn create_to_request( &self, From fbd172bf991c60103fe3ce16093bc2194da19377 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 22 Feb 2023 14:59:09 +0800 Subject: [PATCH 07/12] chore: Remove comment --- src/table/src/requests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index fd00c97c81bc..ec77815187fd 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -43,7 +43,6 @@ pub struct CreateDatabaseRequest { pub create_if_not_exists: bool, } -// TODO(yingwen): Add serialize/deserialize test. /// Create table request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateTableRequest { From 15ab51489b4609ff58f686ac306063f38cea8f72 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 22 Feb 2023 16:10:23 +0800 Subject: [PATCH 08/12] chore: Add todo --- src/datanode/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 32073cf79258..a8f849512fd3 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -413,6 +413,7 @@ pub enum Error { backtrace: Backtrace, }, + // TODO(yingwen): Use procedure's error. #[snafu(display("Failed to execute procedure"))] ProcedureExec {}, } @@ -494,8 +495,7 @@ impl ErrorExt for Error { RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => { source.status_code() } - WaitProcedure { .. } => StatusCode::Internal, - ProcedureExec { .. } => StatusCode::Internal, + WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal, } } From 98c8672c50db8187c54d18da7cc6e2cba9c017de Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 22 Feb 2023 16:38:24 +0800 Subject: [PATCH 09/12] feat: Add procedure config to standalone mode --- config/datanode.example.toml | 4 ++++ config/standalone.example.toml | 5 ++++- src/cmd/src/standalone.rs | 5 ++++- src/datanode/src/instance.rs | 5 +++++ src/table-procedure/src/test_util.rs | 12 ++++++------ 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 83bcfc4d5d03..6a4ac171eb80 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -29,3 +29,7 @@ tcp_nodelay = false max_inflight_tasks = 4 max_files_in_level0 = 16 max_purge_tasks = 32 + +[procedure.store] +type = 'File' +data_dir = '/tmp/greptimedb/procedure/' diff --git a/config/standalone.example.toml b/config/standalone.example.toml index af6ca0bcfa83..f22de3c5894d 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -14,7 +14,6 @@ purge_threshold = '50GB' read_batch_size = 128 sync_write = false - [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' @@ -42,3 +41,7 @@ enable = true addr = '127.0.0.1:4003' runtime_size = 2 check_pwd = false + +[procedure.store] +type = 'File' +data_dir = '/tmp/greptimedb/procedure/' diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1be6ad897915..77fef7481b2f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,7 @@ use clap::Parser; use common_base::Plugins; use common_telemetry::info; use datanode::datanode::{ - CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig, + CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, }; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions}; @@ -81,6 +81,7 @@ pub struct StandaloneOptions { pub wal: WalConfig, pub storage: ObjectStoreConfig, pub compaction: CompactionConfig, + pub procedure: Option, } impl Default for StandaloneOptions { @@ -99,6 +100,7 @@ impl Default for StandaloneOptions { wal: WalConfig::default(), storage: ObjectStoreConfig::default(), compaction: CompactionConfig::default(), + procedure: None, } } } @@ -125,6 +127,7 @@ impl StandaloneOptions { wal: self.wal, storage: self.storage, compaction: self.compaction, + procedure: self.procedure, ..Default::default() } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 10e9532c8dbd..18cac0268283 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -426,6 +426,11 @@ async fn create_procedure_manager( return Ok(None); }; + info!( + "Creating procedure manager with config: {:?}", + procedure_config + ); + let object_store = new_object_store(&procedure_config.store).await?; let manager_config = ManagerConfig { object_store }; diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index ab8a8669cfe5..ac047a93f593 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -21,8 +21,8 @@ use common_procedure::ProcedureManagerRef; use log_store::NoopLogStore; use mito::config::EngineConfig; use mito::engine::MitoEngine; -use object_store::services::fs::Builder; -use object_store::ObjectStore; +use object_store::services::Fs; +use object_store::{ObjectStore, ObjectStoreBuilder}; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -39,8 +39,8 @@ impl TestEnv { pub fn new(prefix: &str) -> TestEnv { let dir = TempDir::new(prefix).unwrap(); let store_dir = format!("{}/db", dir.path().to_string_lossy()); - let accessor = Builder::default().root(&store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let accessor = Fs::default().root(&store_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor).finish(); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( @@ -56,8 +56,8 @@ impl TestEnv { )); let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy()); - let accessor = Builder::default().root(&procedure_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let accessor = Fs::default().root(&procedure_dir).build().unwrap(); + let object_store = ObjectStore::new(accessor).finish(); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store })); From d624a18a21342375b53f7eaa11e2204cbd22c245 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 23 Feb 2023 19:01:22 +0800 Subject: [PATCH 10/12] feat: Register table-procedure loaders --- src/datanode/src/instance.rs | 14 +++++++++----- src/table-procedure/src/create.rs | 4 ++-- src/table-procedure/src/lib.rs | 21 +++++++++++++++++++++ 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 18cac0268283..5c12c44aac26 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -107,11 +107,6 @@ impl Instance { object_store, )); - let procedure_manager = create_procedure_manager(&opts.procedure).await?; - if let Some(procedure_manager) = &procedure_manager { - table_engine.register_procedure_loaders(&**procedure_manager); - } - // create remote catalog manager let (catalog_manager, factory, table_id_provider) = match opts.mode { Mode::Standalone => { @@ -181,8 +176,17 @@ impl Instance { )), }; + let procedure_manager = create_procedure_manager(&opts.procedure).await?; // Recover procedures. if let Some(procedure_manager) = &procedure_manager { + table_engine.register_procedure_loaders(&**procedure_manager); + table_procedure::register_procedure_loaders( + catalog_manager.clone(), + table_engine.clone(), + table_engine.clone(), + &**procedure_manager, + ); + procedure_manager .recover() .await diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index c27d25948508..c7f5febd419b 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use catalog::{CatalogManagerRef, RegisterTableRequest}; use common_procedure::{ - Context, Error, LockKey, Procedure, ProcedureId, ProcedureManagerRef, ProcedureState, + Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId, Result, Status, }; use common_telemetry::logging; @@ -95,7 +95,7 @@ impl CreateTableProcedure { catalog_manager: CatalogManagerRef, engine_procedure: TableEngineProcedureRef, table_engine: TableEngineRef, - procedure_manager: ProcedureManagerRef, + procedure_manager: &dyn ProcedureManager, ) { procedure_manager .register_loader( diff --git a/src/table-procedure/src/lib.rs b/src/table-procedure/src/lib.rs index fcfd60602468..99153768084f 100644 --- a/src/table-procedure/src/lib.rs +++ b/src/table-procedure/src/lib.rs @@ -19,4 +19,25 @@ pub mod error; #[cfg(test)] mod test_util; +use catalog::CatalogManagerRef; +use common_procedure::ProcedureManager; pub use create::CreateTableProcedure; +use table::engine::{TableEngineProcedureRef, TableEngineRef}; + +/// Register all procedure loaders to the procedure manager. +/// +/// # Panics +/// Panics on error. +pub fn register_procedure_loaders( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + table_engine: TableEngineRef, + procedure_manager: &dyn ProcedureManager, +) { + CreateTableProcedure::register_loader( + catalog_manager, + engine_procedure, + table_engine, + procedure_manager, + ); +} From 606bdc23092bcb2580af1d3e940ea79c277788f8 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 23 Feb 2023 19:15:15 +0800 Subject: [PATCH 11/12] feat: Address review comments CreateTableProcedure just return error if the subprocedure is failed --- src/table-procedure/Cargo.toml | 5 +- src/table-procedure/src/create.rs | 83 ++++++++++++++----------------- src/table-procedure/src/error.rs | 11 +++- 3 files changed, 50 insertions(+), 49 deletions(-) diff --git a/src/table-procedure/Cargo.toml b/src/table-procedure/Cargo.toml index 799aa8ee849a..ac813376d818 100644 --- a/src/table-procedure/Cargo.toml +++ b/src/table-procedure/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "table-procedure" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +license.workspace = true [dependencies] async-trait.workspace = true diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index c7f5febd419b..dba29cb8f82b 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -28,7 +28,7 @@ use table::requests::CreateTableRequest; use crate::error::{ AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu, - SerializeProcedureSnafu, + SerializeProcedureSnafu, SubprocedureFailedSnafu, }; /// Procedure to create a table. @@ -79,7 +79,7 @@ impl CreateTableProcedure { data: CreateTableData { state: CreateTableState::Prepare, request, - sub_procedure_id: None, + subprocedure_id: None, }, catalog_manager, table_engine, @@ -161,53 +161,20 @@ impl CreateTableProcedure { self.data.state = CreateTableState::EngineCreateTable; // Assign procedure id to the subprocedure. - self.data.sub_procedure_id = Some(ProcedureId::random()); + self.data.subprocedure_id = Some(ProcedureId::random()); Ok(Status::executing(true)) } async fn on_engine_create_table(&mut self, ctx: &Context) -> Result { // Safety: subprocedure id is always set in this state. - let sub_id = self.data.sub_procedure_id.unwrap(); - - // Check procedure state. - if let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? { - match sub_state { - ProcedureState::Running => Ok(Status::Suspended { - subprocedures: Vec::new(), - persist: false, - }), - ProcedureState::Done => { - logging::info!( - "On engine create table {}, done, sub_id: {}", - self.data.request.table_name, - sub_id - ); - // The sub procedure is done, we can execute next step. - self.data.state = CreateTableState::RegisterCatalog; - Ok(Status::executing(true)) - } - ProcedureState::Failed => { - // If failed, try to create a new procedure to create table. - let engine_ctx = EngineContext::default(); - let procedure = self - .engine_procedure - .create_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::external)?; - let sub_id = ProcedureId::random(); - // Store the procedure id. - self.data.sub_procedure_id = Some(sub_id); - - Ok(Status::Suspended { - subprocedures: vec![ProcedureWithId { - id: sub_id, - procedure, - }], - persist: true, - }) - } - } - } else { + let sub_id = self.data.subprocedure_id.unwrap(); + + // Query subprocedure state. + let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { + // We need to submit the subprocedure if it doesn't exist. We always need to + // do this check as we might not submitted the subprocedure yet when the manager + // recover this procedure from procedure store. logging::info!( "On engine create table {}, not found, sub_id: {}", self.data.request.table_name, @@ -220,13 +187,37 @@ impl CreateTableProcedure { .engine_procedure .create_table_procedure(&engine_ctx, self.data.request.clone()) .map_err(Error::external)?; - Ok(Status::Suspended { + return Ok(Status::Suspended { subprocedures: vec![ProcedureWithId { id: sub_id, procedure, }], persist: true, - }) + }); + }; + + match sub_state { + ProcedureState::Running => Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }), + ProcedureState::Done => { + logging::info!( + "On engine create table {}, done, sub_id: {}", + self.data.request.table_name, + sub_id + ); + // The sub procedure is done, we can execute next step. + self.data.state = CreateTableState::RegisterCatalog; + Ok(Status::executing(true)) + } + ProcedureState::Failed => { + // Return error if the subprocedure is failed. + SubprocedureFailedSnafu { + subprocedure_id: sub_id, + } + .fail()? + } } } @@ -300,7 +291,7 @@ struct CreateTableData { /// /// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable] /// state. - sub_procedure_id: Option, + subprocedure_id: Option, } impl CreateTableData { diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs index da30cf2dd9f5..582467b0117b 100644 --- a/src/table-procedure/src/error.rs +++ b/src/table-procedure/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use common_procedure::ProcedureId; #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] @@ -48,6 +49,12 @@ pub enum Error { #[snafu(display("Schema {} not found", name))] SchemaNotFound { name: String }, + + #[snafu(display("Subprocedure {} failed", subprocedure_id))] + SubprocedureFailed { + subprocedure_id: ProcedureId, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -57,7 +64,9 @@ impl ErrorExt for Error { use Error::*; match self { - SerializeProcedure { .. } | DeserializeProcedure { .. } => StatusCode::Internal, + SerializeProcedure { .. } | DeserializeProcedure { .. } | SubprocedureFailed { .. } => { + StatusCode::Internal + } InvalidRawSchema { source, .. } => source.status_code(), AccessCatalog { source } => source.status_code(), CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments, From f83b848a353da11690ae3dabd8b78f0f1288f30d Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 23 Feb 2023 19:22:34 +0800 Subject: [PATCH 12/12] chore: Address CR comments --- src/datanode/src/error.rs | 8 ++++++-- src/datanode/src/sql/create.rs | 8 +++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a8f849512fd3..7d481bdb5349 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use common_procedure::ProcedureId; use common_recordbatch::error::Error as RecordBatchError; use datafusion::parquet; use datatypes::prelude::ConcreteDataType; @@ -414,8 +415,11 @@ pub enum Error { }, // TODO(yingwen): Use procedure's error. - #[snafu(display("Failed to execute procedure"))] - ProcedureExec {}, + #[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))] + ProcedureExec { + procedure_id: ProcedureId, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 7e57aad46ddb..2657f45bdb83 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -145,11 +145,9 @@ impl SqlHandler { self.engine_procedure.clone(), ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; - info!( - "Create table {} by procedure {}", - table_name, procedure_with_id.id - ); + info!("Create table {} by procedure {}", table_name, procedure_id); let mut watcher = procedure_manager .submit(procedure_with_id) @@ -165,7 +163,7 @@ impl SqlHandler { return Ok(Output::AffectedRows(0)); } ProcedureState::Failed => { - return ProcedureExecSnafu {}.fail(); + return ProcedureExecSnafu { procedure_id }.fail(); } } }