Skip to content

Commit 16aacd1

Browse files
jun0315paomian
authored andcommitted
feat: Add an error variant RetryLater (GreptimeTeam#1058)
* feat: support retry error * fix: ci * fix: ci * fix: fmt * feat: add convert procedure error * Docs : add rustdoc * fix: cr * fix: cr * fix: rm unless code
1 parent a33a15f commit 16aacd1

File tree

6 files changed

+115
-11
lines changed

6 files changed

+115
-11
lines changed

src/common/error/src/status_code.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,34 @@ impl StatusCode {
8686
pub fn is_success(code: u32) -> bool {
8787
Self::Success as u32 == code
8888
}
89+
90+
pub fn is_retryable(&self) -> bool {
91+
match self {
92+
StatusCode::StorageUnavailable
93+
| StatusCode::RuntimeResourcesExhausted
94+
| StatusCode::Internal => true,
95+
96+
StatusCode::Success
97+
| StatusCode::Unknown
98+
| StatusCode::Unsupported
99+
| StatusCode::Unexpected
100+
| StatusCode::InvalidArguments
101+
| StatusCode::InvalidSyntax
102+
| StatusCode::PlanQuery
103+
| StatusCode::EngineExecuteQuery
104+
| StatusCode::TableAlreadyExists
105+
| StatusCode::TableNotFound
106+
| StatusCode::TableColumnNotFound
107+
| StatusCode::TableColumnExists
108+
| StatusCode::DatabaseNotFound
109+
| StatusCode::UserNotFound
110+
| StatusCode::UnsupportedPasswordType
111+
| StatusCode::UserPasswordMismatch
112+
| StatusCode::AuthHeaderNotFound
113+
| StatusCode::InvalidAuthHeader
114+
| StatusCode::AccessDenied => false,
115+
}
116+
}
89117
}
90118

91119
impl fmt::Display for StatusCode {

src/common/procedure/src/error.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ pub enum Error {
7575
source: serde_json::Error,
7676
backtrace: Backtrace,
7777
},
78+
79+
#[snafu(display("Procedure exec failed, source: {}", source))]
80+
RetryLater {
81+
#[snafu(backtrace)]
82+
source: BoxedError,
83+
},
7884
}
7985

8086
pub type Result<T> = std::result::Result<T, Error>;
@@ -88,7 +94,8 @@ impl ErrorExt for Error {
8894
| Error::DeleteState { .. }
8995
| Error::ListState { .. }
9096
| Error::ReadState { .. }
91-
| Error::FromJson { .. } => StatusCode::Internal,
97+
| Error::FromJson { .. }
98+
| Error::RetryLater { .. } => StatusCode::Internal,
9299
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
93100
StatusCode::InvalidArguments
94101
}
@@ -111,4 +118,26 @@ impl Error {
111118
source: BoxedError::new(err),
112119
}
113120
}
121+
122+
/// Creates a new [Error::RetryLater] error from source `err`.
123+
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
124+
Error::RetryLater {
125+
source: BoxedError::new(err),
126+
}
127+
}
128+
129+
/// Determine whether it is a retry later type through [StatusCode]
130+
pub fn is_retry_later(&self) -> bool {
131+
matches!(self, Error::RetryLater { .. })
132+
}
133+
134+
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
135+
/// to its [StatusCode].
136+
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
137+
if err.status_code().is_retryable() {
138+
Error::retry_later(err)
139+
} else {
140+
Error::external(err)
141+
}
142+
}
114143
}

src/common/procedure/src/local/runner.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ impl ExecResult {
4343
matches!(self, ExecResult::Done)
4444
}
4545

46+
fn is_retry_later(&self) -> bool {
47+
matches!(self, ExecResult::RetryLater)
48+
}
49+
4650
fn is_failed(&self) -> bool {
4751
matches!(self, ExecResult::Failed(_))
4852
}
@@ -208,11 +212,16 @@ impl Runner {
208212
Err(e) => {
209213
logging::error!(
210214
e;
211-
"Failed to execute procedure {}-{}",
215+
"Failed to execute procedure {}-{}, retry: {}",
212216
self.procedure.type_name(),
213-
self.meta.id
217+
self.meta.id,
218+
e.is_retry_later(),
214219
);
215220

221+
if e.is_retry_later() {
222+
return ExecResult::RetryLater;
223+
}
224+
216225
self.meta.set_state(ProcedureState::Failed);
217226

218227
// Write rollback key so we can skip this procedure while recovering procedures.
@@ -290,7 +299,7 @@ impl Runner {
290299
self.procedure.type_name(),
291300
self.meta.id,
292301
subprocedure.procedure.type_name(),
293-
subprocedure.id
302+
subprocedure.id,
294303
);
295304

296305
self.submit_subprocedure(subprocedure.id, subprocedure.procedure);
@@ -372,7 +381,7 @@ impl Runner {
372381
logging::info!(
373382
"Procedure {}-{} done",
374383
self.procedure.type_name(),
375-
self.meta.id
384+
self.meta.id,
376385
);
377386

378387
// Mark the state of this procedure to done.
@@ -701,6 +710,45 @@ mod tests {
701710
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
702711
}
703712

713+
#[tokio::test]
714+
async fn test_execute_on_retry_later_error() {
715+
let mut times = 0;
716+
717+
let exec_fn = move |_| {
718+
times += 1;
719+
async move {
720+
if times == 1 {
721+
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
722+
} else {
723+
Ok(Status::Done)
724+
}
725+
}
726+
.boxed()
727+
};
728+
729+
let retry_later = ProcedureAdapter {
730+
data: "retry_later".to_string(),
731+
lock_key: LockKey::single("catalog.schema.table"),
732+
exec_fn,
733+
};
734+
735+
let dir = TempDir::new("retry_later").unwrap();
736+
let meta = retry_later.new_meta(ROOT_ID);
737+
let ctx = context_without_provider(meta.id);
738+
let object_store = test_util::new_object_store(&dir);
739+
let procedure_store = ProcedureStore::from(object_store.clone());
740+
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store);
741+
742+
let res = runner.execute_once(&ctx).await;
743+
assert!(res.is_retry_later(), "{res:?}");
744+
assert_eq!(ProcedureState::Running, meta.state());
745+
746+
let res = runner.execute_once(&ctx).await;
747+
assert!(res.is_done(), "{res:?}");
748+
assert_eq!(ProcedureState::Done, meta.state());
749+
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
750+
}
751+
704752
#[tokio::test]
705753
async fn test_child_error() {
706754
let mut times = 0;
@@ -733,7 +781,7 @@ mod tests {
733781
let state = ctx.provider.procedure_state(child_id).await.unwrap();
734782
if state == Some(ProcedureState::Failed) {
735783
// The parent procedure to abort itself if child procedure is failed.
736-
Err(Error::external(PlainError::new(
784+
Err(Error::from_error_ext(PlainError::new(
737785
"subprocedure failed".to_string(),
738786
StatusCode::Unexpected,
739787
)))

src/mito/src/engine/procedure/create.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
187187
}
188188

189189
let region_name = engine::region_name(self.data.request.id, *number);
190-
// TODO(yingwen): Most error is recoverable.
191190
if let Some(region) = self
192191
.engine_inner
193192
.storage_engine
194193
.open_region(&engine_ctx, &region_name, &open_opts)
195194
.await
196-
.map_err(Error::external)?
195+
.map_err(Error::from_error_ext)?
197196
{
198197
// Region already exists.
199198
self.regions.insert(*number, region);
@@ -218,7 +217,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
218217
.storage_engine
219218
.create_region(&engine_ctx, region_desc, &create_opts)
220219
.await
221-
.map_err(Error::external)?;
220+
.map_err(Error::from_error_ext)?;
222221

223222
self.regions.insert(*number, region);
224223
}

src/mito/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl ErrorExt for Error {
232232

233233
impl From<Error> for common_procedure::Error {
234234
fn from(e: Error) -> common_procedure::Error {
235-
common_procedure::Error::external(e)
235+
common_procedure::Error::from_error_ext(e)
236236
}
237237
}
238238

src/table-procedure/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,6 @@ impl ErrorExt for Error {
8484

8585
impl From<Error> for common_procedure::Error {
8686
fn from(e: Error) -> common_procedure::Error {
87-
common_procedure::Error::external(e)
87+
common_procedure::Error::from_error_ext(e)
8888
}
8989
}

0 commit comments

Comments
 (0)