Skip to content

Commit bd377ef

Browse files
authored
feat: Procedure to create table and register table to catalog (#1040)
* feat: Add table-procedures crate * feat: Implement procedure to create table * feat: Integrate procedure manager to datanode * test: Test CreateTableProcedure * refactor: Rename table-procedures to table-procedure * feat: Implement create_table_by_procedure * chore: Remove comment * chore: Add todo * feat: Add procedure config to standalone mode * feat: Register table-procedure loaders * feat: Address review comments CreateTableProcedure just return error if the subprocedure is failed * chore: Address CR comments
1 parent df751c3 commit bd377ef

File tree

20 files changed

+834
-16
lines changed

20 files changed

+834
-16
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ members = [
3737
"src/storage",
3838
"src/store-api",
3939
"src/table",
40+
"src/table-procedure",
4041
"tests-integration",
4142
"tests/runner",
4243
]

config/datanode.example.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ tcp_nodelay = false
2929
max_inflight_tasks = 4
3030
max_files_in_level0 = 16
3131
max_purge_tasks = 32
32+
33+
[procedure.store]
34+
type = 'File'
35+
data_dir = '/tmp/greptimedb/procedure/'

config/standalone.example.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ purge_threshold = '50GB'
1414
read_batch_size = 128
1515
sync_write = false
1616

17-
1817
[storage]
1918
type = 'File'
2019
data_dir = '/tmp/greptimedb/data/'
@@ -42,3 +41,7 @@ enable = true
4241
addr = '127.0.0.1:4003'
4342
runtime_size = 2
4443
check_pwd = false
44+
45+
[procedure.store]
46+
type = 'File'
47+
data_dir = '/tmp/greptimedb/procedure/'

src/cmd/src/datanode.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
use clap::Parser;
1616
use common_telemetry::logging;
17-
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
17+
use datanode::datanode::{
18+
Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig,
19+
};
1820
use meta_client::MetaClientOptions;
1921
use servers::Mode;
2022
use snafu::ResultExt;
@@ -65,6 +67,8 @@ struct StartCommand {
6567
data_dir: Option<String>,
6668
#[clap(long)]
6769
wal_dir: Option<String>,
70+
#[clap(long)]
71+
procedure_dir: Option<String>,
6872
}
6973

7074
impl StartCommand {
@@ -134,6 +138,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
134138
if let Some(wal_dir) = cmd.wal_dir {
135139
opts.wal.dir = wal_dir;
136140
}
141+
142+
if let Some(procedure_dir) = cmd.procedure_dir {
143+
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
144+
}
145+
137146
Ok(opts)
138147
}
139148
}

src/cmd/src/standalone.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use clap::Parser;
1818
use common_base::Plugins;
1919
use common_telemetry::info;
2020
use datanode::datanode::{
21-
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig,
21+
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
2222
};
2323
use datanode::instance::InstanceRef;
2424
use frontend::frontend::{Frontend, FrontendOptions};
@@ -81,6 +81,7 @@ pub struct StandaloneOptions {
8181
pub wal: WalConfig,
8282
pub storage: ObjectStoreConfig,
8383
pub compaction: CompactionConfig,
84+
pub procedure: Option<ProcedureConfig>,
8485
}
8586

8687
impl Default for StandaloneOptions {
@@ -99,6 +100,7 @@ impl Default for StandaloneOptions {
99100
wal: WalConfig::default(),
100101
storage: ObjectStoreConfig::default(),
101102
compaction: CompactionConfig::default(),
103+
procedure: None,
102104
}
103105
}
104106
}
@@ -125,6 +127,7 @@ impl StandaloneOptions {
125127
wal: self.wal,
126128
storage: self.storage,
127129
compaction: self.compaction,
130+
procedure: self.procedure,
128131
..Default::default()
129132
}
130133
}

src/common/procedure/src/local.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ pub(crate) struct ManagerContext {
123123
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
124124
lock_map: LockMap,
125125
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
126-
// TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we
127-
// should be able to remove the its message and all its child messages.
128126
/// Messages loaded from the procedure store.
129127
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
130128
}

src/datanode/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ common-catalog = { path = "../common/catalog" }
2121
common-error = { path = "../common/error" }
2222
common-grpc = { path = "../common/grpc" }
2323
common-grpc-expr = { path = "../common/grpc-expr" }
24+
common-procedure = { path = "../common/procedure" }
2425
common-query = { path = "../common/query" }
2526
common-recordbatch = { path = "../common/recordbatch" }
2627
common-runtime = { path = "../common/runtime" }
@@ -52,6 +53,7 @@ storage = { path = "../storage" }
5253
store-api = { path = "../store-api" }
5354
substrait = { path = "../common/substrait" }
5455
table = { path = "../table" }
56+
table-procedure = { path = "../table-procedure" }
5557
tokio.workspace = true
5658
tokio-stream = { version = "0.1", features = ["net"] }
5759
tonic.workspace = true

src/datanode/src/datanode.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,27 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
144144
}
145145
}
146146

147+
#[derive(Debug, Clone, Serialize, Deserialize)]
148+
#[serde(default)]
149+
pub struct ProcedureConfig {
150+
/// Storage config for procedure manager.
151+
pub store: ObjectStoreConfig,
152+
}
153+
154+
impl Default for ProcedureConfig {
155+
fn default() -> ProcedureConfig {
156+
ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string())
157+
}
158+
}
159+
160+
impl ProcedureConfig {
161+
pub fn from_file_path(path: String) -> ProcedureConfig {
162+
ProcedureConfig {
163+
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
164+
}
165+
}
166+
}
167+
147168
#[derive(Clone, Debug, Serialize, Deserialize)]
148169
#[serde(default)]
149170
pub struct DatanodeOptions {
@@ -159,6 +180,7 @@ pub struct DatanodeOptions {
159180
pub wal: WalConfig,
160181
pub storage: ObjectStoreConfig,
161182
pub compaction: CompactionConfig,
183+
pub procedure: Option<ProcedureConfig>,
162184
}
163185

164186
impl Default for DatanodeOptions {
@@ -176,6 +198,7 @@ impl Default for DatanodeOptions {
176198
wal: WalConfig::default(),
177199
storage: ObjectStoreConfig::default(),
178200
compaction: CompactionConfig::default(),
201+
procedure: None,
179202
}
180203
}
181204
}

src/datanode/src/error.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::any::Any;
1616

1717
use common_error::prelude::*;
18+
use common_procedure::ProcedureId;
1819
use common_recordbatch::error::Error as RecordBatchError;
1920
use datafusion::parquet;
2021
use datatypes::prelude::ConcreteDataType;
@@ -394,6 +395,31 @@ pub enum Error {
394395
#[snafu(backtrace)]
395396
source: table::error::Error,
396397
},
398+
399+
#[snafu(display("Failed to recover procedure, source: {}", source))]
400+
RecoverProcedure {
401+
#[snafu(backtrace)]
402+
source: common_procedure::error::Error,
403+
},
404+
405+
#[snafu(display("Failed to submit procedure, source: {}", source))]
406+
SubmitProcedure {
407+
#[snafu(backtrace)]
408+
source: common_procedure::error::Error,
409+
},
410+
411+
#[snafu(display("Failed to wait procedure done, source: {}", source))]
412+
WaitProcedure {
413+
source: tokio::sync::watch::error::RecvError,
414+
backtrace: Backtrace,
415+
},
416+
417+
// TODO(yingwen): Use procedure's error.
418+
#[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))]
419+
ProcedureExec {
420+
procedure_id: ProcedureId,
421+
backtrace: Backtrace,
422+
},
397423
}
398424

399425
pub type Result<T> = std::result::Result<T, Error>;
@@ -470,6 +496,10 @@ impl ErrorExt for Error {
470496
CopyTable { source, .. } => source.status_code(),
471497
TableScanExec { source, .. } => source.status_code(),
472498
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
499+
RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => {
500+
source.status_code()
501+
}
502+
WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal,
473503
}
474504
}
475505

0 commit comments

Comments
 (0)