Skip to content

Commit 04afee2

Browse files
authored
feat(procedure): Support multi-lock keys and querying procedure state from context (#1006)
* feat: Add ContextProvider to Context So procedures can query states of other procedures via the ContextProvider and they don't need to hold a ProcedureManagerRef * feat: Procedure supports acquring multiple lock keys * test: Use multi-locks in test * feat: Add keys_to_lock/unlock
1 parent 5533040 commit 04afee2

File tree

7 files changed

+157
-91
lines changed

7 files changed

+157
-91
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/procedure/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ futures.workspace = true
1313
object-store = { path = "../../object-store" }
1414
serde.workspace = true
1515
serde_json = "1.0"
16+
smallvec = "1"
1617
snafu.workspace = true
1718
tokio.workspace = true
1819
uuid.workspace = true

src/common/procedure/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ mod store;
2424

2525
pub use crate::error::{Error, Result};
2626
pub use crate::procedure::{
27-
BoxedProcedure, Context, LockKey, Procedure, ProcedureId, ProcedureManager,
27+
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
2828
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status,
2929
};

src/common/procedure/src/local.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use crate::local::runner::Runner;
3030
use crate::procedure::BoxedProcedureLoader;
3131
use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef};
3232
use crate::{
33-
BoxedProcedure, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId,
33+
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
34+
ProcedureWithId,
3435
};
3536

3637
/// Mutable metadata of a procedure during execution.
@@ -70,7 +71,7 @@ pub(crate) struct ProcedureMeta {
7071
/// Notify to wait for subprocedures.
7172
child_notify: Notify,
7273
/// Lock required by this procedure.
73-
lock_key: Option<LockKey>,
74+
lock_key: LockKey,
7475
/// Mutable status during execution.
7576
exec_meta: Mutex<ExecMeta>,
7677
}
@@ -128,6 +129,13 @@ pub(crate) struct ManagerContext {
128129
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
129130
}
130131

132+
#[async_trait]
133+
impl ContextProvider for ManagerContext {
134+
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
135+
Ok(self.state(procedure_id))
136+
}
137+
}
138+
131139
impl ManagerContext {
132140
/// Returns a new [ManagerContext].
133141
fn new() -> ManagerContext {
@@ -409,7 +417,7 @@ mod test_util {
409417
lock_notify: Notify::new(),
410418
parent_id: None,
411419
child_notify: Notify::new(),
412-
lock_key: None,
420+
lock_key: LockKey::default(),
413421
exec_meta: Mutex::new(ExecMeta::default()),
414422
}
415423
}
@@ -508,8 +516,8 @@ mod tests {
508516
Ok(self.content.clone())
509517
}
510518

511-
fn lock_key(&self) -> Option<LockKey> {
512-
None
519+
fn lock_key(&self) -> LockKey {
520+
LockKey::default()
513521
}
514522
}
515523

@@ -617,8 +625,8 @@ mod tests {
617625
unimplemented!()
618626
}
619627

620-
fn lock_key(&self) -> Option<LockKey> {
621-
Some(LockKey::new("test.submit"))
628+
fn lock_key(&self) -> LockKey {
629+
LockKey::single("test.submit")
622630
}
623631
}
624632

0 commit comments

Comments
 (0)