Skip to content
Merged
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ members = [
"src/storage",
"src/store-api",
"src/table",
"src/table-procedure",
"tests-integration",
"tests/runner",
]
Expand Down
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ tcp_nodelay = false
max_inflight_tasks = 4
max_files_in_level0 = 16
max_purge_tasks = 32

[procedure.store]
type = 'File'
data_dir = '/tmp/greptimedb/procedure/'
5 changes: 4 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ purge_threshold = '50GB'
read_batch_size = 128
sync_write = false


[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'
Expand Down Expand Up @@ -42,3 +41,7 @@ enable = true
addr = '127.0.0.1:4003'
runtime_size = 2
check_pwd = false

[procedure.store]
type = 'File'
data_dir = '/tmp/greptimedb/procedure/'
11 changes: 10 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
use datanode::datanode::{
Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig,
};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
Expand Down Expand Up @@ -65,6 +67,8 @@ struct StartCommand {
data_dir: Option<String>,
#[clap(long)]
wal_dir: Option<String>,
#[clap(long)]
procedure_dir: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -134,6 +138,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(wal_dir) = cmd.wal_dir {
opts.wal.dir = wal_dir;
}

if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}

Ok(opts)
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use clap::Parser;
use common_base::Plugins;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig,
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions};
Expand Down Expand Up @@ -81,6 +81,7 @@ pub struct StandaloneOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub procedure: Option<ProcedureConfig>,
}

impl Default for StandaloneOptions {
Expand All @@ -99,6 +100,7 @@ impl Default for StandaloneOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
procedure: None,
}
}
}
Expand All @@ -125,6 +127,7 @@ impl StandaloneOptions {
wal: self.wal,
storage: self.storage,
compaction: self.compaction,
procedure: self.procedure,
..Default::default()
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ pub(crate) struct ManagerContext {
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
lock_map: LockMap,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
// TODO(yingwen): Now we never clean the messages. But when the root procedure is done, we
// should be able to remove the its message and all its child messages.
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
}
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
Expand Down Expand Up @@ -52,6 +53,7 @@ storage = { path = "../storage" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
table-procedure = { path = "../table-procedure" }
tokio.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tonic.workspace = true
Expand Down
23 changes: 23 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,27 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Storage config for procedure manager.
pub store: ObjectStoreConfig,
}

impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string())
}
}

impl ProcedureConfig {
pub fn from_file_path(path: String) -> ProcedureConfig {
ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
Expand All @@ -159,6 +180,7 @@ pub struct DatanodeOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub procedure: Option<ProcedureConfig>,
}

impl Default for DatanodeOptions {
Expand All @@ -176,6 +198,7 @@ impl Default for DatanodeOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
procedure: None,
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;

use common_error::prelude::*;
use common_procedure::ProcedureId;
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::parquet;
use datatypes::prelude::ConcreteDataType;
Expand Down Expand Up @@ -394,6 +395,31 @@ pub enum Error {
#[snafu(backtrace)]
source: table::error::Error,
},

#[snafu(display("Failed to recover procedure, source: {}", source))]
RecoverProcedure {
#[snafu(backtrace)]
source: common_procedure::error::Error,
},

#[snafu(display("Failed to submit procedure, source: {}", source))]
SubmitProcedure {
#[snafu(backtrace)]
source: common_procedure::error::Error,
},

#[snafu(display("Failed to wait procedure done, source: {}", source))]
WaitProcedure {
source: tokio::sync::watch::error::RecvError,
backtrace: Backtrace,
},

// TODO(yingwen): Use procedure's error.
#[snafu(display("Failed to execute procedure, procedure_id: {}", procedure_id))]
ProcedureExec {
procedure_id: ProcedureId,
backtrace: Backtrace,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -470,6 +496,10 @@ impl ErrorExt for Error {
CopyTable { source, .. } => source.status_code(),
TableScanExec { source, .. } => source.status_code(),
UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
RecoverProcedure { source, .. } | SubmitProcedure { source, .. } => {
source.status_code()
}
WaitProcedure { .. } | ProcedureExec { .. } => StatusCode::Internal,
}
}

Expand Down
46 changes: 43 additions & 3 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::info;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
Expand All @@ -45,11 +47,11 @@ use table::table::TableIdProviderRef;
use table::Table;

use crate::datanode::{
DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
Expand Down Expand Up @@ -173,12 +175,32 @@ impl Instance {
catalog_manager.clone(),
)),
};

let procedure_manager = create_procedure_manager(&opts.procedure).await?;
// Recover procedures.
if let Some(procedure_manager) = &procedure_manager {
table_engine.register_procedure_loaders(&**procedure_manager);
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
table_engine.clone(),
table_engine.clone(),
&**procedure_manager,
);

procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
}

Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
table_engine,
table_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
table_engine,
procedure_manager,
),
catalog_manager,
script_executor,
Expand Down Expand Up @@ -400,3 +422,21 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngin
.context(OpenLogStoreSnafu)?;
Ok(logstore)
}

async fn create_procedure_manager(
procedure_config: &Option<ProcedureConfig>,
) -> Result<Option<ProcedureManagerRef>> {
let Some(procedure_config) = procedure_config else {
return Ok(None);
};

info!(
"Creating procedure manager with config: {:?}",
procedure_config
);

let object_store = new_object_store(&procedure_config.store).await?;
let manager_config = ManagerConfig { object_store };

Ok(Some(Arc::new(LocalManager::new(manager_config))))
}
4 changes: 3 additions & 1 deletion src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ impl Instance {
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
table_engine,
table_engine.clone(),
catalog_manager.clone(),
query_engine.clone(),
table_engine,
None,
),
catalog_manager,
script_executor,
Expand Down
Loading