Skip to content

Commit 0f7e5a2

Browse files
authored
feat: Implement LocalManager::recover (#981)
* feat: Implement LocalManager::recover * feat: Impl From<ObjectStore> for ProcedureStore
1 parent 9ad6c45 commit 0f7e5a2

File tree

3 files changed

+147
-56
lines changed

3 files changed

+147
-56
lines changed

src/common/procedure/src/local.rs

Lines changed: 129 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ impl ProcedureMeta {
108108

109109
/// Reference counted pointer to [ProcedureMeta].
110110
type ProcedureMetaRef = Arc<ProcedureMeta>;
111+
111112
/// Procedure loaded from store.
112113
struct LoadedProcedure {
113114
procedure: BoxedProcedure,
@@ -176,9 +177,20 @@ impl ManagerContext {
176177

177178
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
178179
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
179-
let messages = self.messages.lock().unwrap();
180-
let message = messages.get(&procedure_id)?;
180+
let message = {
181+
let messages = self.messages.lock().unwrap();
182+
messages.get(&procedure_id).cloned()?
183+
};
184+
185+
self.load_one_procedure_from_message(procedure_id, &message)
186+
}
181187

188+
/// Load procedure from specific [ProcedureMessage].
189+
fn load_one_procedure_from_message(
190+
&self,
191+
procedure_id: ProcedureId,
192+
message: &ProcedureMessage,
193+
) -> Option<LoadedProcedure> {
182194
let loaders = self.loaders.lock().unwrap();
183195
let loader = loaders.get(&message.type_name).or_else(|| {
184196
logging::error!(
@@ -344,7 +356,38 @@ impl ProcedureManager for LocalManager {
344356
}
345357

346358
async fn recover(&self) -> Result<()> {
347-
todo!("Recover procedure and messages")
359+
logging::info!("LocalManager start to recover");
360+
361+
let procedure_store = ProcedureStore::new(self.state_store.clone());
362+
let messages = procedure_store.load_messages().await?;
363+
364+
for (procedure_id, message) in &messages {
365+
if message.parent_id.is_none() {
366+
// This is the root procedure. We only submit the root procedure as it will
367+
// submit sub-procedures to the manager.
368+
let Some(loaded_procedure) = self.manager_ctx.load_one_procedure_from_message(*procedure_id, message) else {
369+
// Try to load other procedures.
370+
continue;
371+
};
372+
373+
logging::info!(
374+
"Recover root procedure {}-{}, step: {}",
375+
loaded_procedure.procedure.type_name(),
376+
procedure_id,
377+
loaded_procedure.step
378+
);
379+
380+
if let Err(e) = self.submit_root(
381+
*procedure_id,
382+
loaded_procedure.step,
383+
loaded_procedure.procedure,
384+
) {
385+
logging::error!(e; "Failed to recover procedure {}", procedure_id);
386+
}
387+
}
388+
}
389+
390+
Ok(())
348391
}
349392

350393
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
@@ -380,7 +423,6 @@ mod test_util {
380423

381424
#[cfg(test)]
382425
mod tests {
383-
use serde::{Deserialize, Serialize};
384426
use tempdir::TempDir;
385427

386428
use super::*;
@@ -447,59 +489,109 @@ mod tests {
447489
assert_eq!(expect, ctx.procedures_in_tree(&root));
448490
}
449491

450-
#[test]
451-
fn test_register_loader() {
452-
let dir = TempDir::new("register").unwrap();
453-
let config = ManagerConfig {
454-
object_store: test_util::new_object_store(&dir),
455-
};
456-
let manager = LocalManager::new(config);
492+
#[derive(Debug)]
493+
struct ProcedureToLoad {
494+
content: String,
495+
}
457496

458-
#[derive(Debug, Serialize, Deserialize)]
459-
struct MockData {
460-
id: u32,
461-
content: String,
497+
#[async_trait]
498+
impl Procedure for ProcedureToLoad {
499+
fn type_name(&self) -> &str {
500+
"ProcedureToLoad"
462501
}
463502

464-
#[derive(Debug)]
465-
struct ProcedureToLoad {
466-
data: MockData,
503+
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
504+
Ok(Status::Done)
467505
}
468506

469-
#[async_trait]
470-
impl Procedure for ProcedureToLoad {
471-
fn type_name(&self) -> &str {
472-
"ProcedureToLoad"
473-
}
507+
fn dump(&self) -> Result<String> {
508+
Ok(self.content.clone())
509+
}
474510

475-
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
476-
unimplemented!()
477-
}
511+
fn lock_key(&self) -> Option<LockKey> {
512+
None
513+
}
514+
}
478515

479-
fn dump(&self) -> Result<String> {
480-
Ok(serde_json::to_string(&self.data).unwrap())
516+
impl ProcedureToLoad {
517+
fn new(content: &str) -> ProcedureToLoad {
518+
ProcedureToLoad {
519+
content: content.to_string(),
481520
}
521+
}
482522

483-
fn lock_key(&self) -> Option<LockKey> {
484-
None
485-
}
523+
fn loader() -> BoxedProcedureLoader {
524+
let f = |json: &str| {
525+
let procedure = ProcedureToLoad::new(json);
526+
Ok(Box::new(procedure) as _)
527+
};
528+
Box::new(f)
486529
}
530+
}
487531

488-
let loader = |json: &str| {
489-
let data = serde_json::from_str(json).unwrap();
490-
let procedure = ProcedureToLoad { data };
491-
Ok(Box::new(procedure) as _)
532+
#[test]
533+
fn test_register_loader() {
534+
let dir = TempDir::new("register").unwrap();
535+
let config = ManagerConfig {
536+
object_store: test_util::new_object_store(&dir),
492537
};
538+
let manager = LocalManager::new(config);
539+
493540
manager
494-
.register_loader("ProcedureToLoad", Box::new(loader))
541+
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
495542
.unwrap();
496543
// Register duplicate loader.
497544
let err = manager
498-
.register_loader("ProcedureToLoad", Box::new(loader))
545+
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
499546
.unwrap_err();
500547
assert!(matches!(err, Error::LoaderConflict { .. }), "{err}");
501548
}
502549

550+
#[tokio::test]
551+
async fn test_recover() {
552+
let dir = TempDir::new("recover").unwrap();
553+
let object_store = test_util::new_object_store(&dir);
554+
let config = ManagerConfig {
555+
object_store: object_store.clone(),
556+
};
557+
let manager = LocalManager::new(config);
558+
559+
manager
560+
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
561+
.unwrap();
562+
563+
// Prepare data
564+
let procedure_store = ProcedureStore::from(object_store.clone());
565+
let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
566+
let root_id = ProcedureId::random();
567+
// Prepare data for the root procedure.
568+
for step in 0..3 {
569+
procedure_store
570+
.store_procedure(root_id, step, &root, None)
571+
.await
572+
.unwrap();
573+
}
574+
575+
let child: BoxedProcedure = Box::new(ProcedureToLoad::new("a child procedure"));
576+
let child_id = ProcedureId::random();
577+
// Prepare data for the child procedure
578+
for step in 0..2 {
579+
procedure_store
580+
.store_procedure(child_id, step, &child, Some(root_id))
581+
.await
582+
.unwrap();
583+
}
584+
585+
// Recover the manager
586+
manager.recover().await.unwrap();
587+
588+
// The manager should submit the root procedure.
589+
assert!(manager.procedure_state(root_id).await.unwrap().is_some());
590+
// Since the mocked root procedure actually doesn't submit subprocedures, so there is no
591+
// related state.
592+
assert!(manager.procedure_state(child_id).await.unwrap().is_none());
593+
}
594+
503595
#[tokio::test]
504596
async fn test_submit_procedure() {
505597
let dir = TempDir::new("submit").unwrap();

src/common/procedure/src/local/runner.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,6 @@ mod tests {
366366

367367
use super::*;
368368
use crate::local::test_util;
369-
use crate::store::ObjectStateStore;
370369
use crate::{LockKey, Procedure};
371370

372371
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -385,12 +384,6 @@ mod tests {
385384
}
386385
}
387386

388-
fn new_procedure_store(object_store: ObjectStore) -> ProcedureStore {
389-
let state_store = ObjectStateStore::new(object_store);
390-
391-
ProcedureStore::new(Arc::new(state_store))
392-
}
393-
394387
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
395388
let dir = format!("{procedure_id}/");
396389
let object = object_store.object(&dir);
@@ -469,7 +462,7 @@ mod tests {
469462
procedure_id: meta.id,
470463
};
471464
let object_store = test_util::new_object_store(&dir);
472-
let procedure_store = new_procedure_store(object_store.clone());
465+
let procedure_store = ProcedureStore::from(object_store.clone());
473466
let mut runner = new_runner(meta, Box::new(normal), procedure_store);
474467

475468
let res = runner.execute_once(&ctx).await;
@@ -519,7 +512,7 @@ mod tests {
519512
procedure_id: meta.id,
520513
};
521514
let object_store = test_util::new_object_store(&dir);
522-
let procedure_store = new_procedure_store(object_store.clone());
515+
let procedure_store = ProcedureStore::from(object_store.clone());
523516
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
524517

525518
let res = runner.execute_once(&ctx).await;
@@ -609,7 +602,7 @@ mod tests {
609602
assert!(manager_ctx.try_insert_procedure(meta.clone()));
610603

611604
let object_store = test_util::new_object_store(&dir);
612-
let procedure_store = new_procedure_store(object_store.clone());
605+
let procedure_store = ProcedureStore::from(object_store.clone());
613606
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
614607
// Replace the manager ctx.
615608
runner.manager_ctx = manager_ctx;
@@ -649,7 +642,7 @@ mod tests {
649642
procedure_id: meta.id,
650643
};
651644
let object_store = test_util::new_object_store(&dir);
652-
let procedure_store = new_procedure_store(object_store.clone());
645+
let procedure_store = ProcedureStore::from(object_store.clone());
653646
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);
654647

655648
let res = runner.execute_once(&ctx).await;
@@ -721,7 +714,7 @@ mod tests {
721714
assert!(manager_ctx.try_insert_procedure(meta.clone()));
722715

723716
let object_store = test_util::new_object_store(&dir);
724-
let procedure_store = new_procedure_store(object_store.clone());
717+
let procedure_store = ProcedureStore::from(object_store.clone());
725718
let mut runner = new_runner(meta, Box::new(parent), procedure_store);
726719
// Replace the manager ctx.
727720
runner.manager_ctx = manager_ctx;

src/common/procedure/src/store.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
use std::collections::HashMap;
1616
use std::fmt;
17+
use std::sync::Arc;
1718

1819
use common_telemetry::logging;
1920
use futures::TryStreamExt;
21+
use object_store::ObjectStore;
2022
use serde::{Deserialize, Serialize};
2123
use snafu::ResultExt;
2224

@@ -27,7 +29,7 @@ use crate::{BoxedProcedure, ProcedureId};
2729
mod state_store;
2830

2931
/// Serialized data of a procedure.
30-
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
32+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
3133
pub struct ProcedureMessage {
3234
/// Type name of the procedure. The procedure framework also use the type name to
3335
/// find a loader to load the procedure.
@@ -115,7 +117,7 @@ impl ProcedureStore {
115117
}
116118

117119
/// Load uncommitted procedures from the storage.
118-
async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
120+
pub(crate) async fn load_messages(&self) -> Result<HashMap<ProcedureId, ProcedureMessage>> {
119121
let mut messages = HashMap::new();
120122
// Track the key-value pair by procedure id.
121123
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
@@ -163,6 +165,14 @@ impl ProcedureStore {
163165
}
164166
}
165167

168+
impl From<ObjectStore> for ProcedureStore {
169+
fn from(store: ObjectStore) -> ProcedureStore {
170+
let state_store = ObjectStateStore::new(store);
171+
172+
ProcedureStore::new(Arc::new(state_store))
173+
}
174+
}
175+
166176
/// Suffix type of the key.
167177
#[derive(Debug, PartialEq, Eq)]
168178
enum KeyType {
@@ -235,11 +245,8 @@ impl ParsedKey {
235245

236246
#[cfg(test)]
237247
mod tests {
238-
use std::sync::Arc;
239-
240248
use async_trait::async_trait;
241249
use object_store::services::fs::Builder;
242-
use object_store::ObjectStore;
243250
use tempdir::TempDir;
244251

245252
use super::*;
@@ -249,9 +256,8 @@ mod tests {
249256
let store_dir = dir.path().to_str().unwrap();
250257
let accessor = Builder::default().root(store_dir).build().unwrap();
251258
let object_store = ObjectStore::new(accessor);
252-
let state_store = ObjectStateStore::new(object_store);
253259

254-
ProcedureStore::new(Arc::new(state_store))
260+
ProcedureStore::from(object_store)
255261
}
256262

257263
#[test]

0 commit comments

Comments
 (0)