1111
1212use std:: sync:: { Arc , Mutex } ;
1313
14- use chrono:: { DateTime , Utc } ;
14+ use chrono:: { DateTime , DurationRound , Utc } ;
1515use dill:: * ;
1616use event_bus:: { AsyncEventHandler , EventBus } ;
1717use futures:: TryStreamExt ;
@@ -30,6 +30,7 @@ use super::pending_flows_state::PendingFlowsState;
3030
3131pub struct FlowServiceInMemory {
3232 state : Arc < Mutex < State > > ,
33+ run_config : Arc < FlowServiceRunConfig > ,
3334 event_bus : Arc < EventBus > ,
3435 flow_event_store : Arc < dyn FlowEventStore > ,
3536 time_source : Arc < dyn SystemTimeSource > ,
@@ -57,6 +58,7 @@ struct State {
5758#[ scope( Singleton ) ]
5859impl FlowServiceInMemory {
5960 pub fn new (
61+ run_config : Arc < FlowServiceRunConfig > ,
6062 event_bus : Arc < EventBus > ,
6163 flow_event_store : Arc < dyn FlowEventStore > ,
6264 time_source : Arc < dyn SystemTimeSource > ,
@@ -66,6 +68,7 @@ impl FlowServiceInMemory {
6668 ) -> Self {
6769 Self {
6870 state : Arc :: new ( Mutex :: new ( State :: default ( ) ) ) ,
71+ run_config,
6972 event_bus,
7073 flow_event_store,
7174 time_source,
@@ -75,6 +78,13 @@ impl FlowServiceInMemory {
7578 }
7679 }
7780
81+ fn round_time ( & self , time : DateTime < Utc > ) -> Result < DateTime < Utc > , InternalError > {
82+ let rounded_time = time
83+ . duration_round ( self . run_config . awaiting_step )
84+ . int_err ( ) ?;
85+ Ok ( rounded_time)
86+ }
87+
7888 #[ tracing:: instrument( level = "debug" , skip_all) ]
7989 async fn run_current_timeslot ( & self ) {
8090 let planned_flows: Vec < _ > = {
@@ -105,7 +115,10 @@ impl FlowServiceInMemory {
105115 }
106116
107117 #[ tracing:: instrument( level = "debug" , skip_all) ]
108- async fn initialize_auto_polling_flows_from_configurations ( & self ) -> Result < ( ) , InternalError > {
118+ async fn initialize_auto_polling_flows_from_configurations (
119+ & self ,
120+ start_time : DateTime < Utc > ,
121+ ) -> Result < ( ) , InternalError > {
109122 let enabled_configurations: Vec < _ > = self
110123 . flow_configuration_service
111124 . list_enabled_configurations ( )
@@ -114,8 +127,12 @@ impl FlowServiceInMemory {
114127 . int_err ( ) ?;
115128
116129 for enabled_config in enabled_configurations {
117- self . activate_flow_configuration ( enabled_config. flow_key , enabled_config. rule )
118- . await ?;
130+ self . activate_flow_configuration (
131+ start_time,
132+ enabled_config. flow_key ,
133+ enabled_config. rule ,
134+ )
135+ . await ?;
119136 }
120137
121138 Ok ( ( ) )
@@ -124,13 +141,15 @@ impl FlowServiceInMemory {
124141 #[ tracing:: instrument( level = "trace" , skip_all, fields( ?flow_key, ?rule) ) ]
125142 async fn activate_flow_configuration (
126143 & self ,
144+ start_time : DateTime < Utc > ,
127145 flow_key : FlowKey ,
128146 rule : FlowConfigurationRule ,
129147 ) -> Result < ( ) , InternalError > {
130148 match & flow_key {
131149 FlowKey :: Dataset ( dataset_flow_key) => {
132150 if let FlowConfigurationRule :: Schedule ( schedule) = & rule {
133- self . enqueue_auto_polling_flow ( & flow_key, schedule) . await ?;
151+ self . enqueue_auto_polling_flow ( start_time, & flow_key, schedule)
152+ . await ?;
134153 }
135154
136155 let mut state = self . state . lock ( ) . unwrap ( ) ;
@@ -140,7 +159,8 @@ impl FlowServiceInMemory {
140159 }
141160 FlowKey :: System ( system_flow_key) => {
142161 if let FlowConfigurationRule :: Schedule ( schedule) = & rule {
143- self . enqueue_auto_polling_flow ( & flow_key, schedule) . await ?;
162+ self . enqueue_auto_polling_flow ( start_time, & flow_key, schedule)
163+ . await ?;
144164
145165 let mut state = self . state . lock ( ) . unwrap ( ) ;
146166 state
@@ -158,6 +178,7 @@ impl FlowServiceInMemory {
158178 #[ tracing:: instrument( level = "trace" , skip_all, fields( ?flow_key) ) ]
159179 async fn try_enqueue_auto_polling_flow_if_enabled (
160180 & self ,
181+ start_time : DateTime < Utc > ,
161182 flow_key : & FlowKey ,
162183 ) -> Result < ( ) , InternalError > {
163184 let maybe_active_schedule = self
@@ -168,7 +189,7 @@ impl FlowServiceInMemory {
168189 . try_get_flow_schedule ( flow_key) ;
169190
170191 if let Some ( active_schedule) = maybe_active_schedule {
171- self . enqueue_auto_polling_flow ( flow_key, & active_schedule)
192+ self . enqueue_auto_polling_flow ( start_time , flow_key, & active_schedule)
172193 . await ?;
173194 }
174195
@@ -178,6 +199,7 @@ impl FlowServiceInMemory {
178199 #[ tracing:: instrument( level = "trace" , skip_all, fields( ?flow_key, ?schedule) ) ]
179200 async fn enqueue_auto_polling_flow (
180201 & self ,
202+ start_time : DateTime < Utc > ,
181203 flow_key : & FlowKey ,
182204 schedule : & Schedule ,
183205 ) -> Result < FlowState , InternalError > {
@@ -191,7 +213,7 @@ impl FlowServiceInMemory {
191213 None => {
192214 let mut flow = self . make_new_flow ( flow_key. clone ( ) , trigger) . await ?;
193215
194- let next_activation_time = schedule. next_activation_time ( self . time_source . now ( ) ) ;
216+ let next_activation_time = schedule. next_activation_time ( start_time ) ;
195217 self . enqueue_flow ( flow. flow_id , next_activation_time) ?;
196218
197219 flow. activate_at_time ( self . time_source . now ( ) , next_activation_time)
@@ -207,6 +229,7 @@ impl FlowServiceInMemory {
207229 #[ tracing:: instrument( level = "trace" , skip_all, fields( %dataset_id, ?flow_type, %flow_id) ) ]
208230 async fn enqueue_dependent_dataset_flows (
209231 & self ,
232+ start_time : DateTime < Utc > ,
210233 dataset_id : & DatasetID ,
211234 flow_type : DatasetFlowType ,
212235 flow_id : FlowID ,
@@ -259,15 +282,14 @@ impl FlowServiceInMemory {
259282 if let Some ( throttling_period) = start_condition. throttling_period {
260283 // TODO: throttle not from NOW,
261284 // but from last flow of the dependent daataset
262- let now = self . time_source . now ( ) ;
263285 self . enqueue_flow (
264286 dependent_dataset_flow. flow_id ,
265- now + throttling_period,
287+ start_time + throttling_period,
266288 ) ?;
267289
268290 dependent_dataset_flow
269291 . define_start_condition (
270- now,
292+ self . time_source . now ( ) ,
271293 FlowStartCondition :: Throttling ( FlowStartConditionThrottling {
272294 interval : throttling_period,
273295 } ) ,
@@ -390,22 +412,24 @@ impl FlowServiceInMemory {
390412impl FlowService for FlowServiceInMemory {
391413 /// Runs the update main loop
392414 #[ tracing:: instrument( level = "info" , skip_all) ]
393- async fn run ( & self , run_config : FlowServiceRunConfig ) -> Result < ( ) , InternalError > {
415+ async fn run ( & self ) -> Result < ( ) , InternalError > {
394416 // Initial scheduling
395- self . initialize_auto_polling_flows_from_configurations ( )
417+ let start_time = self . round_time ( self . time_source . now ( ) ) ?;
418+ self . initialize_auto_polling_flows_from_configurations ( start_time)
396419 . await ?;
397420
398421 // Publish progress event
399422 self . event_bus
400423 . dispatch_event ( FlowServiceEventConfigurationLoaded {
401- event_time : self . time_source . now ( ) ,
424+ event_time : start_time ,
402425 } )
403426 . await
404427 . int_err ( ) ?;
405428
406429 // Main scanning loop
407430 let main_loop_span = tracing:: debug_span!( "FlowService main loop" ) ;
408431 let _ = main_loop_span. enter ( ) ;
432+ let std_awaiting_step = self . run_config . awaiting_step . to_std ( ) . int_err ( ) ?;
409433
410434 loop {
411435 // Do we have a timeslot scheduled?
@@ -425,13 +449,13 @@ impl FlowService for FlowServiceInMemory {
425449 // Publish progress event
426450 self . event_bus
427451 . dispatch_event ( FlowServiceEventExecutedTimeSlot {
428- event_time : current_time ,
452+ event_time : nearest_activation_time ,
429453 } )
430454 . await
431455 . int_err ( ) ?;
432456 }
433457
434- tokio:: time:: sleep ( run_config . awaiting_step ) . await ;
458+ tokio:: time:: sleep ( std_awaiting_step ) . await ;
435459 continue ;
436460 }
437461 }
@@ -664,6 +688,7 @@ impl AsyncEventHandler<TaskEventFinished> for FlowServiceInMemory {
664688 && flow_key. flow_type . is_dataset_update ( )
665689 {
666690 self . enqueue_dependent_dataset_flows (
691+ self . round_time ( event. event_time ) ?,
667692 & flow_key. dataset_id ,
668693 flow_key. flow_type ,
669694 flow. flow_id ,
@@ -681,7 +706,7 @@ impl AsyncEventHandler<TaskEventFinished> for FlowServiceInMemory {
681706 // In case of success:
682707 // - enqueue next auto-polling flow cycle
683708 if event. outcome == TaskOutcome :: Success {
684- self . try_enqueue_auto_polling_flow_if_enabled ( & flow. flow_key )
709+ self . try_enqueue_auto_polling_flow_if_enabled ( event . event_time , & flow. flow_key )
685710 . await ?;
686711 }
687712
@@ -703,8 +728,12 @@ impl AsyncEventHandler<FlowConfigurationEventModified> for FlowServiceInMemory {
703728 state. active_configs . drop_flow_config ( & event. flow_key ) ;
704729 // TODO: should we unqueue pending flows / abort scheduled tasks?
705730 } else {
706- self . activate_flow_configuration ( event. flow_key . clone ( ) , event. rule . clone ( ) )
707- . await ?
731+ self . activate_flow_configuration (
732+ self . round_time ( event. event_time ) ?,
733+ event. flow_key . clone ( ) ,
734+ event. rule . clone ( ) ,
735+ )
736+ . await ?
708737 }
709738
710739 Ok ( ( ) )
0 commit comments