Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 129 additions & 37 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl ProcedureMeta {

/// Reference counted pointer to [ProcedureMeta].
type ProcedureMetaRef = Arc<ProcedureMeta>;

/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
Expand Down Expand Up @@ -176,9 +177,20 @@ impl ManagerContext {

/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let messages = self.messages.lock().unwrap();
let message = messages.get(&procedure_id)?;
let message = {
let messages = self.messages.lock().unwrap();
messages.get(&procedure_id).cloned()?
};

self.load_one_procedure_from_message(procedure_id, &message)
}

/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
procedure_id: ProcedureId,
message: &ProcedureMessage,
) -> Option<LoadedProcedure> {
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
Expand Down Expand Up @@ -344,7 +356,38 @@ impl ProcedureManager for LocalManager {
}

async fn recover(&self) -> Result<()> {
todo!("Recover procedure and messages")
logging::info!("LocalManager start to recover");

let procedure_store = ProcedureStore::new(self.state_store.clone());
let messages = procedure_store.load_messages().await?;

for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
// This is the root procedure. We only submit the root procedure as it will
// submit sub-procedures to the manager.
let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else {
// Try to load other procedures.
continue;
};

logging::info!(
"Recover root procedure {}-{}, step: {}",
loaded_procedure.procedure.type_name(),
procedure_id,
loaded_procedure.step
);

if let Err(e) = self.submit_root(
*procedure_id,
loaded_procedure.step,
loaded_procedure.procedure,
) {
logging::error!(e; "Failed to recover procedure {}", procedure_id);
}
}
}

Ok(())
}

async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Expand Down Expand Up @@ -380,7 +423,6 @@ mod test_util {

#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use tempdir::TempDir;

use super::*;
Expand Down Expand Up @@ -447,59 +489,109 @@ mod tests {
assert_eq!(expect, ctx.procedures_in_tree(&root));
}

#[test]
fn test_register_loader() {
let dir = TempDir::new("register").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
};
let manager = LocalManager::new(config);
#[derive(Debug)]
struct ProcedureToLoad {
content: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct MockData {
id: u32,
content: String,
#[async_trait]
impl Procedure for ProcedureToLoad {
fn type_name(&self) -> &str {
"ProcedureToLoad"
}

#[derive(Debug)]
struct ProcedureToLoad {
data: MockData,
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
Ok(Status::Done)
}

#[async_trait]
impl Procedure for ProcedureToLoad {
fn type_name(&self) -> &str {
"ProcedureToLoad"
}
fn dump(&self) -> Result<String> {
Ok(self.content.clone())
}

async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
unimplemented!()
}
fn lock_key(&self) -> Option<LockKey> {
None
}
}

fn dump(&self) -> Result<String> {
Ok(serde_json::to_string(&self.data).unwrap())
impl ProcedureToLoad {
fn new(content: &str) -> ProcedureToLoad {
ProcedureToLoad {
content: content.to_string(),
}
}

fn lock_key(&self) -> Option<LockKey> {
None
}
fn loader() -> BoxedProcedureLoader {
let f = |json: &str| {
let procedure = ProcedureToLoad::new(json);
Ok(Box::new(procedure) as _)
};
Box::new(f)
}
}

let loader = |json: &str| {
let data = serde_json::from_str(json).unwrap();
let procedure = ProcedureToLoad { data };
Ok(Box::new(procedure) as _)
#[test]
fn test_register_loader() {
let dir = TempDir::new("register").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
};
let manager = LocalManager::new(config);

manager
.register_loader("ProcedureToLoad", Box::new(loader))
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap();
// Register duplicate loader.
let err = manager
.register_loader("ProcedureToLoad", Box::new(loader))
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap_err();
assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
}

#[tokio::test]
async fn test_recover() {
let dir = TempDir::new("recover").unwrap();
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
};
let manager = LocalManager::new(config);

manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
.unwrap();

// Prepare data
let procedure_store = ProcedureStore::from(object_store.clone());
let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
let root_id = ProcedureId::random();
// Prepare data for the root procedure.
for step in 0..3 {
procedure_store
.store_procedure(root_id, step, &root, None)
.await
.unwrap();
}

let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
let child_id = ProcedureId::random();
// Prepare data for the child procedure
for step in 0..2 {
procedure_store
.store_procedure(child_id, step, &child, Some(root_id))
.await
.unwrap();
}

// Recover the manager
manager.recover().await.unwrap();

// The manager should submit the root procedure.
assert!(manager.procedure_state(root_id).await.unwrap().is_some());
// Since the mocked root procedure actually doesn't submit subprocedures, so there is no
// related state.
assert!(manager.procedure_state(child_id).await.unwrap().is_none());
}

#[tokio::test]
async fn test_submit_procedure() {
let dir = TempDir::new("submit").unwrap();
Expand Down
17 changes: 5 additions & 12 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ mod tests {

use super::*;
use crate::local::test_util;
use crate::store::ObjectStateStore;
use crate::{LockKey, Procedure};

const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
Expand All @@ -385,12 +384,6 @@ mod tests {
}
}

fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(object_store);

ProcedureStore::new(Arc::new(state_store))
}

async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{procedure_id}/");
let object = object_store.object(&dir);
Expand Down Expand Up @@ -469,7 +462,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(normal), procedure_store);

let res = runner.execute_once(&ctx).await;
Expand Down Expand Up @@ -519,7 +512,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);

let res = runner.execute_once(&ctx).await;
Expand Down Expand Up @@ -609,7 +602,7 @@ mod tests {
assert!(manager_ctx.try_insert_procedure(meta.clone()));

let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
Expand Down Expand Up @@ -649,7 +642,7 @@ mod tests {
procedure_id: meta.id,
};
let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);

let res = runner.execute_once(&ctx).await;
Expand Down Expand Up @@ -721,7 +714,7 @@ mod tests {
assert!(manager_ctx.try_insert_procedure(meta.clone()));

let object_store = test_util::new_object_store(&dir);
let procedure_store = new_procedure_store(object_store.clone());
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
Expand Down
20 changes: 13 additions & 7 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use common_telemetry::logging;
use futures::TryStreamExt;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

Expand All @@ -27,7 +29,7 @@ use crate::{BoxedProcedure, ProcedureId};
mod state_store;

/// Serialized data of a procedure.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProcedureMessage {
/// Type name of the procedure. The procedure framework also use the type name to
/// find a loader to load the procedure.
Expand Down Expand Up @@ -115,7 +117,7 @@ impl ProcedureStore {
}

/// Load uncommitted procedures from the storage.
async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
pub(crate) async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
let mut messages = HashMap::new();
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
Expand Down Expand Up @@ -163,6 +165,14 @@ impl ProcedureStore {
}
}

impl From<ObjectStore> for ProcedureStore {
fn from(store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(store);

ProcedureStore::new(Arc::new(state_store))
}
}

/// Suffix type of the key.
#[derive(Debug, PartialEq, Eq)]
enum KeyType {
Expand Down Expand Up @@ -235,11 +245,8 @@ impl ParsedKey {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use tempdir::TempDir;

use super::*;
Expand All @@ -249,9 +256,8 @@ mod tests {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let state_store = ObjectStateStore::new(object_store);

ProcedureStore::new(Arc::new(state_store))
ProcedureStore::from(object_store)
}

#[test]
Expand Down