Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c438398
feat: procedure config
jun0315 Feb 28, 2023
9a02f19
fix: modify config
jun0315 Feb 28, 2023
8dcf9b6
feat: add retry logic
jun0315 Feb 28, 2023
bb00096
Merge branch 'develop' into max-retry
jun0315 Feb 28, 2023
d076a19
feat: add error
jun0315 Feb 28, 2023
a5ecb9d
feat: add it
jun0315 Feb 28, 2023
62cbaf3
feat: add it
jun0315 Feb 28, 2023
0c57c70
feat: add it
jun0315 Feb 28, 2023
9fd2ec7
feat: rm retry from runner
jun0315 Feb 28, 2023
8dcb9fe
feat: use backon
jun0315 Mar 1, 2023
2dffab9
feat: add retry_interval
jun0315 Mar 1, 2023
14b6121
feat: add retry_interval
jun0315 Mar 1, 2023
a14cac5
Merge branch 'develop' into max-retry
jun0315 Mar 1, 2023
3769194
fix: conflict
jun0315 Mar 1, 2023
177c282
Merge branch 'develop' into max-retry
jun0315 Mar 1, 2023
ad78dad
fix: cr
jun0315 Mar 1, 2023
dfd7a09
Merge branch 'develop' into max-retry
jun0315 Mar 2, 2023
19c7a97
feat: add retry error and id
jun0315 Mar 2, 2023
8e30f33
feat: rename
jun0315 Mar 2, 2023
10aa270
refactor: execute
jun0315 Mar 2, 2023
80b635e
feat: use config dir
jun0315 Mar 2, 2023
9ac8478
fix: cr
jun0315 Mar 2, 2023
facacdd
fix: cr
jun0315 Mar 2, 2023
81a72b3
fix: fmt
jun0315 Mar 2, 2023
4bdef09
fix: fmt
jun0315 Mar 2, 2023
93f1928
fix: pr
jun0315 Mar 2, 2023
c106174
fix: it
jun0315 Mar 2, 2023
2720b39
fix: rm unless cmd params
jun0315 Mar 2, 2023
8e56209
feat: add toml
jun0315 Mar 2, 2023
7675fb1
fix: ut
jun0315 Mar 2, 2023
a012761
Merge branch 'develop' into max-retry
jun0315 Mar 6, 2023
22353db
feat: add rolling back
jun0315 Mar 6, 2023
de9cb47
Merge branch 'develop' into max-retry
jun0315 Mar 14, 2023
852ddc3
fix: cr
jun0315 Mar 14, 2023
6e47e03
fix: cr
jun0315 Mar 14, 2023
10ca0bc
fix: cr
jun0315 Mar 14, 2023
64863a9
fix: ci
jun0315 Mar 14, 2023
16bc9ac
fix: ci
jun0315 Mar 14, 2023
de65375
fix: ci
jun0315 Mar 14, 2023
7fd44b9
chore: Apply suggestions from code review
evenyag Mar 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
opts.wal.dir = wal_dir;
}

if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}
opts.procedure = if let Some(path) = cmd.procedure_dir {
Comment thread
jun0315 marked this conversation as resolved.
Outdated
Some(toml_loader::from_file!(&path)?)
} else {
Some(ProcedureConfig::default())
};

Ok(opts)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
backon = "0.4.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true
Expand Down
11 changes: 11 additions & 0 deletions src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ pub enum Error {
source: Arc<Error>,
backtrace: Backtrace,
},

#[snafu(display(
"Procedure retry exceeded max times, procedure_id: {}, source:{}",
procedure_id,
source
))]
RetryTimesExceeded {
source: Arc<Error>,
procedure_id: ProcedureId,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -111,6 +121,7 @@ impl ErrorExt for Error {
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
Expand Down
19 changes: 19 additions & 0 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ mod runner;

use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
Expand Down Expand Up @@ -291,12 +293,16 @@ impl ManagerContext {
pub struct ManagerConfig {
/// Object store
pub object_store: ObjectStore,
pub max_retry_times: usize,
pub retry_delay: u64,
Comment thread
jun0315 marked this conversation as resolved.
Outdated
}

/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
max_retry_times: usize,
retry_delay: u64,
Comment thread
jun0315 marked this conversation as resolved.
Outdated
}

impl LocalManager {
Expand All @@ -305,6 +311,8 @@ impl LocalManager {
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
}
}

Expand All @@ -321,6 +329,9 @@ impl LocalManager {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(self.retry_delay))
.with_max_times(self.max_retry_times),
store: ProcedureStore::new(self.state_store.clone()),
};

Expand Down Expand Up @@ -543,6 +554,8 @@ mod tests {
let dir = TempDir::new("register").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: 500,
};
let manager = LocalManager::new(config);

Expand All @@ -562,6 +575,8 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
max_retry_times: 3,
retry_delay: 500,
};
let manager = LocalManager::new(config);

Expand Down Expand Up @@ -606,6 +621,8 @@ mod tests {
let dir = TempDir::new("submit").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: 500,
};
let manager = LocalManager::new(config);

Expand Down Expand Up @@ -652,6 +669,8 @@ mod tests {
let dir = TempDir::new("on_err").unwrap();
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: 500,
};
let manager = LocalManager::new(config);

Expand Down
96 changes: 79 additions & 17 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
use std::sync::Arc;
use std::time::Duration;

use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use tokio::time;

use crate::error::{ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};

const ERR_WAIT_DURATION: Duration = Duration::from_secs(30);
use crate::ProcedureState::Retry;
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};

#[derive(Debug)]
enum ExecResult {
Expand Down Expand Up @@ -108,6 +108,7 @@ pub(crate) struct Runner {
pub(crate) procedure: BoxedProcedure,
pub(crate) manager_ctx: Arc<ManagerContext>,
pub(crate) step: u32,
pub(crate) exponential_builder: ExponentialBuilder,
pub(crate) store: ProcedureStore,
}

Expand Down Expand Up @@ -164,15 +165,30 @@ impl Runner {
provider: self.manager_ctx.clone(),
};

loop {
match self.execute_once(&ctx).await {
ExecResult::Continue => (),
ExecResult::Done | ExecResult::Failed => return,
self.execute_once_with_retry(&ctx).await;
}

async fn execute_once_with_retry(&mut self, ctx: &Context) {
let retry = self.exponential_builder.build();
let mut retry_times = 0;
for dur in retry {
match self.execute_once(ctx).await {
ExecResult::Continue | ExecResult::Done | ExecResult::Failed => return,
ExecResult::RetryLater => {
self.wait_on_err().await;
retry_times += 1;
self.wait_on_err(dur, retry_times).await;
}
}
}
assert!(self.meta.state().is_retry());
if let Retry { error } = self.meta.state() {
self.meta.set_state(ProcedureState::failed(Arc::new(
Error::RetryTimesExceeded {
source: error,
procedure_id: self.meta.id,
},
)))
}
}

async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
Expand All @@ -186,8 +202,11 @@ impl Runner {
status.need_persist(),
);

if status.need_persist() && self.persist_procedure().await.is_err() {
return ExecResult::RetryLater;
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retry(Arc::new(err)));
return ExecResult::RetryLater;
}
}

match status {
Expand All @@ -196,7 +215,8 @@ impl Runner {
self.on_suspended(subprocedures).await;
}
Status::Done => {
if self.commit_procedure().await.is_err() {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retry(Arc::new(e)));
return ExecResult::RetryLater;
}

Expand All @@ -217,16 +237,18 @@ impl Runner {
);

if e.is_retry_later() {
self.meta.set_state(ProcedureState::retry(Arc::new(e)));
return ExecResult::RetryLater;
}

self.meta.set_state(ProcedureState::failed(Arc::new(e)));

// Write rollback key so we can skip this procedure while recovering procedures.
if self.rollback_procedure().await.is_err() {
if let Err(e) = self.rollback_procedure().await {
Comment thread
jun0315 marked this conversation as resolved.
Outdated
self.meta.set_state(ProcedureState::retry(Arc::new(e)));
return ExecResult::RetryLater;
}

self.meta.set_state(ProcedureState::failed(Arc::new(e)));

ExecResult::Failed
}
}
Expand Down Expand Up @@ -261,6 +283,7 @@ impl Runner {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: self.exponential_builder.clone(),
store: self.store.clone(),
};

Expand All @@ -285,8 +308,16 @@ impl Runner {
});
}

async fn wait_on_err(&self) {
time::sleep(ERR_WAIT_DURATION).await;
/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&self, d: Duration, i: u64) {
logging::info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
i,
d.as_millis(),
);
time::sleep(d).await;
}

async fn on_suspended(&self, subprocedures: Vec<ProcedureWithId>) {
Expand Down Expand Up @@ -416,6 +447,7 @@ mod tests {
procedure,
manager_ctx: Arc::new(ManagerContext::new()),
step: 0,
exponential_builder: ExponentialBuilder::default(),
store,
}
}
Expand Down Expand Up @@ -752,6 +784,36 @@ mod tests {
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}

#[tokio::test]
async fn test_execute_exceed_max_retry_later() {
let exec_fn =
|_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();

let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};

let dir = TempDir::new("exceed_max_retry_later").unwrap();
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(
meta.clone(),
Box::new(exceed_max_retry_later),
procedure_store,
);
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);

// Run the runner and execute the procedure.
runner.execute_procedure_in_loop().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("Procedure retry exceeded max times"));
}

#[tokio::test]
async fn test_child_error() {
let mut times = 0;
Expand Down Expand Up @@ -819,7 +881,7 @@ mod tests {
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

// Run the runer and execute the procedure.
// Run the runner and execute the procedure.
runner.run().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("subprocedure failed"), "{err}");
Expand Down
13 changes: 13 additions & 0 deletions src/common/procedure/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ pub enum ProcedureState {
Running,
/// The procedure is finished.
Done,
/// The procedure is failed and can be retried.
Retry { error: Arc<Error> },
Comment thread
jun0315 marked this conversation as resolved.
Outdated
/// The procedure is failed and cannot proceed anymore.
Failed { error: Arc<Error> },
}
Expand All @@ -216,6 +218,11 @@ impl ProcedureState {
ProcedureState::Failed { error }
}

/// Returns a [ProcedureState] with retry state.
pub fn retry(error: Arc<Error>) -> ProcedureState {
ProcedureState::Retry { error }
}

/// Returns true if the procedure state is running.
pub fn is_running(&self) -> bool {
matches!(self, ProcedureState::Running)
Expand All @@ -231,10 +238,16 @@ impl ProcedureState {
matches!(self, ProcedureState::Failed { .. })
}

/// Returns true if the procedure state retry.
Comment thread
jun0315 marked this conversation as resolved.
Outdated
pub fn is_retry(&self) -> bool {
matches!(self, ProcedureState::Retry { .. })
}

/// Returns the error.
pub fn error(&self) -> Option<&Arc<Error>> {
match self {
ProcedureState::Failed { error } => Some(error),
ProcedureState::Retry { error } => Some(error),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/procedure/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> {
ProcedureState::Failed { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
}
ProcedureState::Retry { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
}
}
}
}
Loading