Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c438398
feat: procedure config
jun0315 Feb 28, 2023
9a02f19
fix: modify config
jun0315 Feb 28, 2023
8dcf9b6
feat: add retry logic
jun0315 Feb 28, 2023
bb00096
Merge branch 'develop' into max-retry
jun0315 Feb 28, 2023
d076a19
feat: add error
jun0315 Feb 28, 2023
a5ecb9d
feat: add it
jun0315 Feb 28, 2023
62cbaf3
feat: add it
jun0315 Feb 28, 2023
0c57c70
feat: add it
jun0315 Feb 28, 2023
9fd2ec7
feat: rm retry from runner
jun0315 Feb 28, 2023
8dcb9fe
feat: use backon
jun0315 Mar 1, 2023
2dffab9
feat: add retry_interval
jun0315 Mar 1, 2023
14b6121
feat: add retry_interval
jun0315 Mar 1, 2023
a14cac5
Merge branch 'develop' into max-retry
jun0315 Mar 1, 2023
3769194
fix: conflict
jun0315 Mar 1, 2023
177c282
Merge branch 'develop' into max-retry
jun0315 Mar 1, 2023
ad78dad
fix: cr
jun0315 Mar 1, 2023
dfd7a09
Merge branch 'develop' into max-retry
jun0315 Mar 2, 2023
19c7a97
feat: add retry error and id
jun0315 Mar 2, 2023
8e30f33
feat: rename
jun0315 Mar 2, 2023
10aa270
refactor: execute
jun0315 Mar 2, 2023
80b635e
feat: use config dir
jun0315 Mar 2, 2023
9ac8478
fix: cr
jun0315 Mar 2, 2023
facacdd
fix: cr
jun0315 Mar 2, 2023
81a72b3
fix: fmt
jun0315 Mar 2, 2023
4bdef09
fix: fmt
jun0315 Mar 2, 2023
93f1928
fix: pr
jun0315 Mar 2, 2023
c106174
fix: it
jun0315 Mar 2, 2023
2720b39
fix: rm unless cmd params
jun0315 Mar 2, 2023
8e56209
feat: add toml
jun0315 Mar 2, 2023
7675fb1
fix: ut
jun0315 Mar 2, 2023
a012761
Merge branch 'develop' into max-retry
jun0315 Mar 6, 2023
22353db
feat: add rolling back
jun0315 Mar 6, 2023
de9cb47
Merge branch 'develop' into max-retry
jun0315 Mar 14, 2023
852ddc3
fix: cr
jun0315 Mar 14, 2023
6e47e03
fix: cr
jun0315 Mar 14, 2023
10ca0bc
fix: cr
jun0315 Mar 14, 2023
64863a9
fix: ci
jun0315 Mar 14, 2023
16bc9ac
fix: ci
jun0315 Mar 14, 2023
de65375
fix: ci
jun0315 Mar 14, 2023
7fd44b9
chore: Apply suggestions from code review
evenyag Mar 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ max_purge_tasks = 32
# [procedure.store]
# type = 'File'
# data_dir = '/tmp/greptimedb/procedure/'
Comment thread
evenyag marked this conversation as resolved.
Outdated
# max_retry_times = 3
# retry_delay = 500ms
Comment thread
evenyag marked this conversation as resolved.
Outdated
4 changes: 4 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ max_purge_tasks = 32
# type = "File"
# # Procedure data path.
# data_dir = "/tmp/greptimedb/procedure/"
# # Procedure max retry time.
# max_retry_times = 3
# # Initial retry delay of procedures, increases exponentially
# retry_delay = 500ms
Comment thread
evenyag marked this conversation as resolved.
Outdated
1 change: 0 additions & 1 deletion src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(wal_dir) = cmd.wal_dir {
opts.wal.dir = wal_dir;
}

if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}
Expand Down
1 change: 1 addition & 0 deletions src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
backon = "0.4.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true
Expand Down
11 changes: 11 additions & 0 deletions src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ pub enum Error {
source: Arc<Error>,
backtrace: Backtrace,
},

#[snafu(display(
"Procedure retry exceeded max times, procedure_id: {}, source:{}",
procedure_id,
source
))]
RetryTimesExceeded {
source: Arc<Error>,
procedure_id: ProcedureId,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -111,6 +121,7 @@ impl ErrorExt for Error {
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
Expand Down
20 changes: 20 additions & 0 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ mod runner;

use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
Expand Down Expand Up @@ -291,12 +293,16 @@ impl ManagerContext {
pub struct ManagerConfig {
/// Object store
pub object_store: ObjectStore,
pub max_retry_times: usize,
pub retry_delay: Duration,
}

/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
max_retry_times: usize,
retry_delay: Duration,
}

impl LocalManager {
Expand All @@ -305,6 +311,8 @@ impl LocalManager {
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
}
}

Expand All @@ -321,7 +329,11 @@ impl LocalManager {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: ExponentialBuilder::default()
.with_min_delay(self.retry_delay)
.with_max_times(self.max_retry_times),
store: ProcedureStore::new(self.state_store.clone()),
rolling_back: false,
};

let watcher = meta.state_receiver.clone();
Expand Down Expand Up @@ -543,6 +555,8 @@ mod tests {
let dir = create_temp_dir("register");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);

Expand All @@ -562,6 +576,8 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);

Expand Down Expand Up @@ -606,6 +622,8 @@ mod tests {
let dir = create_temp_dir("submit");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);

Expand Down Expand Up @@ -652,6 +670,8 @@ mod tests {
let dir = create_temp_dir("on_err");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);

Expand Down
121 changes: 101 additions & 20 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
use std::sync::Arc;
use std::time::Duration;

use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use tokio::time;

use crate::error::{ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};

const ERR_WAIT_DURATION: Duration = Duration::from_secs(30);
use crate::ProcedureState::Retrying;
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};

#[derive(Debug)]
enum ExecResult {
Expand Down Expand Up @@ -108,7 +108,9 @@ pub(crate) struct Runner {
pub(crate) procedure: BoxedProcedure,
pub(crate) manager_ctx: Arc<ManagerContext>,
pub(crate) step: u32,
pub(crate) exponential_builder: ExponentialBuilder,
pub(crate) store: ProcedureStore,
pub(crate) rolling_back: bool,
}

impl Runner {
Expand Down Expand Up @@ -164,18 +166,56 @@ impl Runner {
provider: self.manager_ctx.clone(),
};

self.rolling_back = false;
self.execute_once_with_retry(&ctx).await;
}

async fn execute_once_with_retry(&mut self, ctx: &Context) {
let mut retry = self.exponential_builder.build();
let mut retry_times = 0;
loop {
match self.execute_once(&ctx).await {
ExecResult::Continue => (),
match self.execute_once(ctx).await {
ExecResult::Done | ExecResult::Failed => return,
ExecResult::Continue => (),
ExecResult::RetryLater => {
self.wait_on_err().await;
retry_times += 1;
if let Some(d) = retry.next() {
self.wait_on_err(d, retry_times).await;
} else {
assert!(self.meta.state().is_retrying());
if let Retrying { error } = self.meta.state() {
self.meta.set_state(ProcedureState::failed(Arc::new(
Error::RetryTimesExceeded {
source: error,
procedure_id: self.meta.id,
},
)))
}
return;
}
}
}
}
}

async fn rollback(&mut self, error: Arc<Error>) -> ExecResult {
if let Err(e) = self.rollback_procedure().await {
self.rolling_back = true;
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}
self.meta.set_state(ProcedureState::failed(error));
ExecResult::Failed
}

async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
// if rolling_back, there is no need to execute again.
if self.rolling_back {
// We can definitely get the previous error here.
let state = self.meta.state();
let err = state.error().unwrap();
return self.rollback(err.clone()).await;
}
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
Expand All @@ -186,8 +226,11 @@ impl Runner {
status.need_persist(),
);

if status.need_persist() && self.persist_procedure().await.is_err() {
return ExecResult::RetryLater;
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
return ExecResult::RetryLater;
}
}

match status {
Expand All @@ -196,7 +239,8 @@ impl Runner {
self.on_suspended(subprocedures).await;
}
Status::Done => {
if self.commit_procedure().await.is_err() {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}

Expand All @@ -217,17 +261,12 @@ impl Runner {
);

if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}

self.meta.set_state(ProcedureState::failed(Arc::new(e)));

// Write rollback key so we can skip this procedure while recovering procedures.
if self.rollback_procedure().await.is_err() {
return ExecResult::RetryLater;
}

ExecResult::Failed
self.rollback(Arc::new(e)).await
}
}
}
Expand Down Expand Up @@ -261,7 +300,9 @@ impl Runner {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: self.exponential_builder.clone(),
store: self.store.clone(),
rolling_back: false,
};

// Insert the procedure. We already check the procedure existence before inserting
Expand All @@ -285,8 +326,16 @@ impl Runner {
});
}

async fn wait_on_err(&self) {
time::sleep(ERR_WAIT_DURATION).await;
/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&self, d: Duration, i: u64) {
logging::info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
i,
d.as_millis(),
);
time::sleep(d).await;
}

async fn on_suspended(&self, subprocedures: Vec<ProcedureWithId>) {
Expand Down Expand Up @@ -416,7 +465,9 @@ mod tests {
procedure,
manager_ctx: Arc::new(ManagerContext::new()),
step: 0,
exponential_builder: ExponentialBuilder::default(),
store,
rolling_back: false,
}
}

Expand Down Expand Up @@ -744,14 +795,44 @@ mod tests {

let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert!(meta.state().is_running());
assert!(meta.state().is_retrying());

let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
assert!(meta.state().is_done());
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}

#[tokio::test]
async fn test_execute_exceed_max_retry_later() {
let exec_fn =
|_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();

let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};

let dir = create_temp_dir("exceed_max_retry_later");
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(
meta.clone(),
Box::new(exceed_max_retry_later),
procedure_store,
);
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);

// Run the runner and execute the procedure.
runner.execute_procedure_in_loop().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("Procedure retry exceeded max times"));
}

#[tokio::test]
async fn test_child_error() {
let mut times = 0;
Expand Down Expand Up @@ -819,7 +900,7 @@ mod tests {
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;

// Run the runer and execute the procedure.
// Run the runner and execute the procedure.
runner.run().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("subprocedure failed"), "{err}");
Expand Down
Loading