Skip to content

Commit 41a924e

Browse files
Tested and fixed manual flow triggers
1 parent ee19e73 commit 41a924e

File tree

2 files changed

+122
-15
lines changed

2 files changed

+122
-15
lines changed

src/infra/flow-system-inmem/src/services/flow/flow_service_inmem.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,14 @@ impl FlowService for FlowServiceInMemory {
484484
.await
485485
.map_err(|e| RequestFlowError::Internal(e)),
486486

487-
// Otherwise, initiate a new flow and schedule immediate task
487+
// Otherwise, initiate a new flow and activate it at the nearest scheduler slot
488488
None => {
489489
let mut flow = self.make_new_flow(flow_key, trigger).await?;
490-
self.schedule_flow_task(&mut flow).await?;
490+
let activation_time = self.round_time(self.time_source.now())?;
491+
self.enqueue_flow(flow.flow_id, activation_time)?;
492+
493+
flow.activate_at_time(self.time_source.now(), activation_time)
494+
.int_err()?;
491495
flow.save(self.flow_event_store.as_ref()).await.int_err()?;
492496
Ok(flow.into())
493497
}

src/infra/flow-system-inmem/tests/tests/test_flow_service_inmem.rs

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use opendatafabric::*;
2626
/////////////////////////////////////////////////////////////////////////////////////////
2727

2828
#[test_log::test(tokio::test)]
29-
async fn test_read_initial_config() {
29+
async fn test_read_initial_config_and_queue_properly() {
3030
let harness = FlowHarness::new();
3131

3232
let foo_id = harness.create_root_dataset("foo").await;
@@ -66,10 +66,10 @@ async fn test_read_initial_config() {
6666
let bar_moment = state.snapshots[2].0;
6767

6868
assert!(start_moment < foo_moment && foo_moment < bar_moment);
69-
assert_eq!((foo_moment - start_moment), Duration::milliseconds(30));
70-
assert_eq!((bar_moment - start_moment), Duration::milliseconds(45));
69+
assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // planned time for "foo"
70+
assert_eq!((bar_moment - start_moment), Duration::milliseconds(45)); // planned time for "bar"
7171

72-
let flow_test_checks = [
72+
assert_flow_test_checks(&[
7373
// Snapshot 0: after initial queueing
7474
FlowTestCheck {
7575
snapshot: &state.snapshots[0].1,
@@ -94,15 +94,7 @@ async fn test_read_initial_config() {
9494
(&bar_flow_key, FlowStatus::Scheduled),
9595
],
9696
},
97-
];
98-
99-
for test_check in flow_test_checks {
100-
assert_eq!(test_check.snapshot.len(), test_check.patterns.len());
101-
for pattern in test_check.patterns {
102-
let flow_state = test_check.snapshot.get(pattern.0).unwrap();
103-
assert_eq!(flow_state.status(), pattern.1);
104-
}
105-
}
97+
]);
10698

10799
let task_ids = harness.snapshot_all_current_task_ids().await;
108100
assert_eq!(task_ids.len(), 2);
@@ -140,11 +132,111 @@ async fn test_read_initial_config() {
140132

141133
/////////////////////////////////////////////////////////////////////////////////////////
142134

135+
#[test_log::test(tokio::test)]
136+
async fn test_manual_trigger() {
137+
let harness = FlowHarness::new();
138+
139+
let foo_id = harness.create_root_dataset("foo").await;
140+
let bar_id = harness.create_root_dataset("bar").await;
141+
142+
// Note: only "foo" has auto-schedule, "bar" hasn't
143+
harness
144+
.set_dataset_flow_schedule(
145+
foo_id.clone(),
146+
DatasetFlowType::Ingest,
147+
Duration::milliseconds(30).into(),
148+
)
149+
.await;
150+
151+
let foo_flow_key: FlowKey = FlowKeyDataset::new(foo_id.clone(), DatasetFlowType::Ingest).into();
152+
let bar_flow_key: FlowKey = FlowKeyDataset::new(bar_id.clone(), DatasetFlowType::Ingest).into();
153+
154+
let _ = tokio::select! {
155+
res = harness.flow_service.run() => res.int_err(),
156+
_ = async {
157+
// Sleep < "foo" period
158+
tokio::time::sleep(std::time::Duration::from_millis(18)).await;
159+
harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already
160+
harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" not queued, starts soon
161+
162+
// Wake up after foo scheduling
163+
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
164+
harness.trigger_manual_flow(foo_flow_key.clone()).await; // "foo" pending already, even running
165+
harness.trigger_manual_flow(bar_flow_key.clone()).await; // "bar" pending already, event running
166+
167+
// Make sure nothing got scheduled in near time
168+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
169+
170+
} => Ok(()),
171+
}
172+
.unwrap();
173+
174+
let test_flow_listener = harness.catalog.get_one::<TestFlowSystemListener>().unwrap();
175+
176+
let state = test_flow_listener.state.lock().unwrap();
177+
assert_eq!(3, state.snapshots.len());
178+
179+
let start_moment = state.snapshots[0].0;
180+
let bar_moment = state.snapshots[1].0;
181+
let foo_moment = state.snapshots[2].0;
182+
183+
assert_eq!((bar_moment - start_moment), Duration::milliseconds(20)); // next slot after 18ms trigger with 5ms align
184+
assert_eq!((foo_moment - start_moment), Duration::milliseconds(30)); // 30ms as planned
185+
186+
assert_flow_test_checks(&[
187+
// Snapshot 0: after initial queueing, no "bar", only "foo"
188+
FlowTestCheck {
189+
snapshot: &state.snapshots[0].1,
190+
patterns: vec![(&foo_flow_key, FlowStatus::Queued)],
191+
},
192+
// Snapshot 1: "bar" had manual trigger
193+
FlowTestCheck {
194+
snapshot: &state.snapshots[1].1,
195+
patterns: vec![
196+
(&foo_flow_key, FlowStatus::Queued),
197+
(&bar_flow_key, FlowStatus::Scheduled),
198+
],
199+
},
200+
// Snapshot 2: period passed for 'foo'
201+
FlowTestCheck {
202+
snapshot: &state.snapshots[2].1,
203+
patterns: vec![
204+
(&foo_flow_key, FlowStatus::Scheduled),
205+
(&bar_flow_key, FlowStatus::Scheduled),
206+
],
207+
},
208+
]);
209+
}
210+
211+
/////////////////////////////////////////////////////////////////////////////////////////
212+
213+
// TODO:
214+
// - completing task with: success/failure/cancel
215+
// - scheduling next auto-trigger when task completes
216+
// - scheduling derived datasets after parent dataset succeeds
217+
// - cancelling queued/scheduled flow (at flow level, not at task level)
218+
// - flow config paused/resumed/modified when already queued/scheduled
219+
// - dataset deleted when flow queued/scheduled
220+
221+
/////////////////////////////////////////////////////////////////////////////////////////
222+
143223
struct FlowTestCheck<'a> {
144224
snapshot: &'a HashMap<FlowKey, FlowState>,
145225
patterns: Vec<(&'a FlowKey, FlowStatus)>,
146226
}
147227

228+
fn assert_flow_test_checks<'a>(flow_test_checks: &[FlowTestCheck<'a>]) {
229+
for test_check in flow_test_checks {
230+
assert_eq!(test_check.snapshot.len(), test_check.patterns.len());
231+
for pattern in test_check.patterns.iter() {
232+
let flow_state = test_check.snapshot.get(pattern.0).unwrap();
233+
assert_eq!(flow_state.status(), pattern.1);
234+
}
235+
}
236+
}
237+
238+
/////////////////////////////////////////////////////////////////////////////////////////
239+
148240
struct TaskTestCheck<'a> {
149241
task_id: TaskID,
150242
flow_key: &'a FlowKey,
@@ -330,6 +422,17 @@ impl FlowHarness {
330422
}
331423
task_ids
332424
}
425+
426+
async fn trigger_manual_flow(&self, flow_key: FlowKey) {
427+
self.flow_service
428+
.trigger_manual_flow(
429+
flow_key,
430+
FAKE_ACCOUNT_ID.to_string(),
431+
AccountName::new_unchecked(auth::DEFAULT_ACCOUNT_NAME),
432+
)
433+
.await
434+
.unwrap();
435+
}
333436
}
334437

335438
/////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)