Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,34 @@ impl StatusCode {
pub fn is_success(code: u32) -> bool {
Self::Success as u32 == code
}

pub fn is_retryable(&self) -> bool {
match self {
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal => true,

StatusCode::Success
| StatusCode::Unknown
| StatusCode::Unsupported
| StatusCode::Unexpected
| StatusCode::InvalidArguments
| StatusCode::InvalidSyntax
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied => false,
}
}
}

impl fmt::Display for StatusCode {
Expand Down
31 changes: 30 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ pub enum Error {
source: serde_json::Error,
backtrace: Backtrace,
},

#[snafu(display("Procedure exec failed, source: {}", source))]
RetryLater {
#[snafu(backtrace)]
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -88,7 +94,8 @@ impl ErrorExt for Error {
| Error::DeleteState { .. }
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. } => StatusCode::Internal,
| Error::FromJson { .. }
| Error::RetryLater { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Expand All @@ -111,4 +118,26 @@ impl Error {
source: BoxedError::new(err),
}
}

/// Creates a new [Error::RetryLater] error from source `err`.
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {
source: BoxedError::new(err),
}
}

/// Determine whether it is a retry later type through [StatusCode]
pub fn is_retry_later(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}

/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
/// to its [StatusCode].
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
if err.status_code().is_retryable() {
Error::retry_later(err)
} else {
Error::external(err)
}
}
}
59 changes: 54 additions & 5 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl ExecResult {
matches!(self, ExecResult::Done)
}

fn is_retry_later(&self) -> bool {
matches!(self, ExecResult::RetryLater)
}

fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed(_))
}
Expand Down Expand Up @@ -206,13 +210,19 @@ impl Runner {
ExecResult::Continue
}
Err(e) => {
type ProcedureError = Error;
Comment thread
jun0315 marked this conversation as resolved.
Outdated
logging::error!(
e;
"Failed to execute procedure {}-{}",
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
self.meta.id
self.meta.id,
e.is_retry_later(),
);

if e.is_retry_later() {
return ExecResult::RetryLater;
}

self.meta.set_state(ProcedureState::Failed);

// Write rollback key so we can skip this procedure while recovering procedures.
Expand Down Expand Up @@ -290,7 +300,7 @@ impl Runner {
self.procedure.type_name(),
self.meta.id,
subprocedure.procedure.type_name(),
subprocedure.id
subprocedure.id,
);

self.submit_subprocedure(subprocedure.id, subprocedure.procedure);
Expand Down Expand Up @@ -372,7 +382,7 @@ impl Runner {
logging::info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id
self.meta.id,
);

// Mark the state of this procedure to done.
Expand Down Expand Up @@ -701,6 +711,45 @@ mod tests {
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
}

#[tokio::test]
async fn test_execute_on_retry_later_error() {
let mut times = 0;

let exec_fn = move |_| {
times += 1;
async move {
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else {
Ok(Status::Done)
}
}
.boxed()
};

let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};

let dir = TempDir::new("retry_later").unwrap();
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store);

let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert_eq!(ProcedureState::Running, meta.state());

let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
assert_eq!(ProcedureState::Done, meta.state());
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}

#[tokio::test]
async fn test_child_error() {
let mut times = 0;
Expand Down Expand Up @@ -733,7 +782,7 @@ mod tests {
let state = ctx.provider.procedure_state(child_id).await.unwrap();
if state == Some(ProcedureState::Failed) {
// The parent procedure to abort itself if child procedure is failed.
Err(Error::external(PlainError::new(
Err(Error::from_error_ext(PlainError::new(
"subprocedure failed".to_string(),
StatusCode::Unexpected,
)))
Expand Down
5 changes: 2 additions & 3 deletions src/mito/src/engine/procedure/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
}

let region_name = engine::region_name(self.data.request.id, *number);
// TODO(yingwen): Most error is recoverable.
if let Some(region) = self
.engine_inner
.storage_engine
.open_region(&engine_ctx, &region_name, &open_opts)
.await
.map_err(Error::external)?
.map_err(Error::from_error_ext)?
{
// Region already exists.
self.regions.insert(*number, region);
Expand All @@ -218,7 +217,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
.storage_engine
.create_region(&engine_ctx, region_desc, &create_opts)
.await
.map_err(Error::external)?;
.map_err(Error::from_error_ext)?;

self.regions.insert(*number, region);
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl ErrorExt for Error {

impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::external(e)
common_procedure::Error::from_error_ext(e)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/table-procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ impl ErrorExt for Error {

impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::external(e)
common_procedure::Error::from_error_ext(e)
}
}