@@ -8,7 +8,7 @@ use std::{
88use crate :: {
99 client:: Client ,
1010 enter_sync, error, id, new_error,
11- runtime:: { AsyncCommand , RuntimeHandle } ,
11+ runtime:: { AsyncCommand , Runtime , RuntimeHandle } ,
1212 util:: { AsyncCallback , Struct } ,
1313 ROOT_MOD ,
1414} ;
@@ -20,15 +20,19 @@ use magnus::{
2020} ;
2121use prost:: Message ;
2222use temporal_sdk_core:: {
23+ replay:: { HistoryForReplay , ReplayWorkerInput } ,
2324 ResourceBasedSlotsOptions , ResourceBasedSlotsOptionsBuilder , ResourceSlotOptions ,
24- SlotSupplierOptions , TunerHolder , TunerHolderOptionsBuilder , WorkerConfigBuilder ,
25+ SlotSupplierOptions , TunerHolder , TunerHolderOptionsBuilder , WorkerConfig , WorkerConfigBuilder ,
2526} ;
2627use temporal_sdk_core_api:: {
2728 errors:: { PollError , WorkflowErrorType } ,
2829 worker:: SlotKind ,
2930} ;
3031use temporal_sdk_core_protos:: coresdk:: workflow_completion:: WorkflowActivationCompletion ;
3132use temporal_sdk_core_protos:: coresdk:: { ActivityHeartbeat , ActivityTaskCompletion } ;
33+ use temporal_sdk_core_protos:: temporal:: api:: history:: v1:: History ;
34+ use tokio:: sync:: mpsc:: { channel, Sender } ;
35+ use tokio_stream:: wrappers:: ReceiverStream ;
3236
3337pub fn init ( ruby : & Ruby ) -> Result < ( ) , Error > {
3438 let class = ruby
@@ -55,6 +59,11 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
5559 ) ?;
5660 class. define_method ( "replace_client" , method ! ( Worker :: replace_client, 1 ) ) ?;
5761 class. define_method ( "initiate_shutdown" , method ! ( Worker :: initiate_shutdown, 0 ) ) ?;
62+
63+ let inner_class = class. define_class ( "WorkflowReplayer" , class:: object ( ) ) ?;
64+ inner_class. define_singleton_method ( "new" , function ! ( WorkflowReplayer :: new, 2 ) ) ?;
65+ inner_class. define_method ( "push_history" , method ! ( WorkflowReplayer :: push_history, 2 ) ) ?;
66+
5867 Ok ( ( ) )
5968}
6069
@@ -89,70 +98,13 @@ impl Worker {
8998 let activity = options. member :: < bool > ( id ! ( "activity" ) ) ?;
9099 let workflow = options. member :: < bool > ( id ! ( "workflow" ) ) ?;
91100
92- // Build config
93- let config = WorkerConfigBuilder :: default ( )
94- . namespace ( options. member :: < String > ( id ! ( "namespace" ) ) ?)
95- . task_queue ( options. member :: < String > ( id ! ( "task_queue" ) ) ?)
96- . worker_build_id ( options. member :: < String > ( id ! ( "build_id" ) ) ?)
97- . client_identity_override ( options. member :: < Option < String > > ( id ! ( "identity_override" ) ) ?)
98- . max_cached_workflows ( options. member :: < usize > ( id ! ( "max_cached_workflows" ) ) ?)
99- . max_concurrent_wft_polls (
100- options. member :: < usize > ( id ! ( "max_concurrent_workflow_task_polls" ) ) ?,
101- )
102- . nonsticky_to_sticky_poll_ratio (
103- options. member :: < f32 > ( id ! ( "nonsticky_to_sticky_poll_ratio" ) ) ?,
104- )
105- . max_concurrent_at_polls (
106- options. member :: < usize > ( id ! ( "max_concurrent_activity_task_polls" ) ) ?,
107- )
108- . no_remote_activities ( options. member :: < bool > ( id ! ( "no_remote_activities" ) ) ?)
109- . sticky_queue_schedule_to_start_timeout ( Duration :: from_secs_f64 (
110- options. member ( id ! ( "sticky_queue_schedule_to_start_timeout" ) ) ?,
111- ) )
112- . max_heartbeat_throttle_interval ( Duration :: from_secs_f64 (
113- options. member ( id ! ( "max_heartbeat_throttle_interval" ) ) ?,
114- ) )
115- . default_heartbeat_throttle_interval ( Duration :: from_secs_f64 (
116- options. member ( id ! ( "default_heartbeat_throttle_interval" ) ) ?,
117- ) )
118- . max_worker_activities_per_second (
119- options. member :: < Option < f64 > > ( id ! ( "max_worker_activities_per_second" ) ) ?,
120- )
121- . max_task_queue_activities_per_second (
122- options. member :: < Option < f64 > > ( id ! ( "max_task_queue_activities_per_second" ) ) ?,
123- )
124- . graceful_shutdown_period ( Duration :: from_secs_f64 (
125- options. member ( id ! ( "graceful_shutdown_period" ) ) ?,
126- ) )
127- . use_worker_versioning ( options. member :: < bool > ( id ! ( "use_worker_versioning" ) ) ?)
128- . tuner ( Arc :: new ( build_tuner (
129- options
130- . child ( id ! ( "tuner" ) ) ?
131- . ok_or_else ( || error ! ( "Missing tuner" ) ) ?,
132- ) ?) )
133- . workflow_failure_errors (
134- if options. member :: < bool > ( id ! ( "nondeterminism_as_workflow_fail" ) ) ? {
135- HashSet :: from ( [ WorkflowErrorType :: Nondeterminism ] )
136- } else {
137- HashSet :: new ( )
138- } ,
139- )
140- . workflow_types_to_failure_errors (
141- options
142- . member :: < Vec < String > > ( id ! ( "nondeterminism_as_workflow_fail_for_types" ) ) ?
143- . into_iter ( )
144- . map ( |s| ( s, HashSet :: from ( [ WorkflowErrorType :: Nondeterminism ] ) ) )
145- . collect :: < HashMap < String , HashSet < WorkflowErrorType > > > ( ) ,
146- )
147- . build ( )
148- . map_err ( |err| error ! ( "Invalid worker options: {}" , err) ) ?;
149-
150101 let worker = temporal_sdk_core:: init_worker (
151102 & client. runtime_handle . core ,
152- config ,
103+ build_config ( options ) ? ,
153104 client. core . clone ( ) . into_inner ( ) ,
154105 )
155106 . map_err ( |err| error ! ( "Failed creating worker: {}" , err) ) ?;
107+
156108 Ok ( Worker {
157109 core : RefCell :: new ( Some ( Arc :: new ( worker) ) ) ,
158110 runtime_handle : client. runtime_handle . clone ( ) ,
@@ -435,6 +387,113 @@ impl Worker {
435387 }
436388}
437389
390+ #[ derive( DataTypeFunctions , TypedData ) ]
391+ #[ magnus(
392+ class = "Temporalio::Internal::Bridge::Worker::WorkflowReplayer" ,
393+ free_immediately
394+ ) ]
395+ pub struct WorkflowReplayer {
396+ tx : Sender < HistoryForReplay > ,
397+ runtime_handle : RuntimeHandle ,
398+ }
399+
400+ impl WorkflowReplayer {
401+ pub fn new ( runtime : & Runtime , options : Struct ) -> Result < ( Self , Worker ) , Error > {
402+ enter_sync ! ( runtime. handle. clone( ) ) ;
403+
404+ let ( tx, rx) = channel ( 1 ) ;
405+
406+ let core_worker = temporal_sdk_core:: init_replay_worker ( ReplayWorkerInput :: new (
407+ build_config ( options) ?,
408+ ReceiverStream :: new ( rx) ,
409+ ) )
410+ . map_err ( |err| error ! ( "Failed creating worker: {}" , err) ) ?;
411+
412+ Ok ( (
413+ WorkflowReplayer {
414+ tx,
415+ runtime_handle : runtime. handle . clone ( ) ,
416+ } ,
417+ Worker {
418+ core : RefCell :: new ( Some ( Arc :: new ( core_worker) ) ) ,
419+ runtime_handle : runtime. handle . clone ( ) ,
420+ activity : false ,
421+ workflow : true ,
422+ } ,
423+ ) )
424+ }
425+
426+ pub fn push_history ( & self , workflow_id : String , proto : RString ) -> Result < ( ) , Error > {
427+ let history = History :: decode ( unsafe { proto. as_slice ( ) } )
428+ . map_err ( |err| error ! ( "Invalid proto: {}" , err) ) ?;
429+ let tx = self . tx . clone ( ) ;
430+ self . runtime_handle . core . tokio_handle ( ) . spawn ( async move {
431+ // Intentionally ignoring error here
432+ let _ = tx. send ( HistoryForReplay :: new ( history, workflow_id) ) . await ;
433+ } ) ;
434+ Ok ( ( ) )
435+ }
436+ }
437+
438+ fn build_config ( options : Struct ) -> Result < WorkerConfig , Error > {
439+ WorkerConfigBuilder :: default ( )
440+ . namespace ( options. member :: < String > ( id ! ( "namespace" ) ) ?)
441+ . task_queue ( options. member :: < String > ( id ! ( "task_queue" ) ) ?)
442+ . worker_build_id ( options. member :: < String > ( id ! ( "build_id" ) ) ?)
443+ . client_identity_override ( options. member :: < Option < String > > ( id ! ( "identity_override" ) ) ?)
444+ . max_cached_workflows ( options. member :: < usize > ( id ! ( "max_cached_workflows" ) ) ?)
445+ . max_concurrent_wft_polls (
446+ options. member :: < usize > ( id ! ( "max_concurrent_workflow_task_polls" ) ) ?,
447+ )
448+ . nonsticky_to_sticky_poll_ratio (
449+ options. member :: < f32 > ( id ! ( "nonsticky_to_sticky_poll_ratio" ) ) ?,
450+ )
451+ . max_concurrent_at_polls (
452+ options. member :: < usize > ( id ! ( "max_concurrent_activity_task_polls" ) ) ?,
453+ )
454+ . no_remote_activities ( options. member :: < bool > ( id ! ( "no_remote_activities" ) ) ?)
455+ . sticky_queue_schedule_to_start_timeout ( Duration :: from_secs_f64 (
456+ options. member ( id ! ( "sticky_queue_schedule_to_start_timeout" ) ) ?,
457+ ) )
458+ . max_heartbeat_throttle_interval ( Duration :: from_secs_f64 (
459+ options. member ( id ! ( "max_heartbeat_throttle_interval" ) ) ?,
460+ ) )
461+ . default_heartbeat_throttle_interval ( Duration :: from_secs_f64 (
462+ options. member ( id ! ( "default_heartbeat_throttle_interval" ) ) ?,
463+ ) )
464+ . max_worker_activities_per_second (
465+ options. member :: < Option < f64 > > ( id ! ( "max_worker_activities_per_second" ) ) ?,
466+ )
467+ . max_task_queue_activities_per_second (
468+ options. member :: < Option < f64 > > ( id ! ( "max_task_queue_activities_per_second" ) ) ?,
469+ )
470+ . graceful_shutdown_period ( Duration :: from_secs_f64 (
471+ options. member ( id ! ( "graceful_shutdown_period" ) ) ?,
472+ ) )
473+ . use_worker_versioning ( options. member :: < bool > ( id ! ( "use_worker_versioning" ) ) ?)
474+ . tuner ( Arc :: new ( build_tuner (
475+ options
476+ . child ( id ! ( "tuner" ) ) ?
477+ . ok_or_else ( || error ! ( "Missing tuner" ) ) ?,
478+ ) ?) )
479+ . workflow_failure_errors (
480+ if options. member :: < bool > ( id ! ( "nondeterminism_as_workflow_fail" ) ) ? {
481+ HashSet :: from ( [ WorkflowErrorType :: Nondeterminism ] )
482+ } else {
483+ HashSet :: new ( )
484+ } ,
485+ )
486+ . workflow_types_to_failure_errors (
487+ options
488+ . member :: < Vec < String > > ( id ! ( "nondeterminism_as_workflow_fail_for_types" ) ) ?
489+ . into_iter ( )
490+ . map ( |s| ( s, HashSet :: from ( [ WorkflowErrorType :: Nondeterminism ] ) ) )
491+ . collect :: < HashMap < String , HashSet < WorkflowErrorType > > > ( ) ,
492+ )
493+ . build ( )
494+ . map_err ( |err| error ! ( "Invalid worker options: {}" , err) )
495+ }
496+
438497fn build_tuner ( options : Struct ) -> Result < TunerHolder , Error > {
439498 let ( workflow_slot_options, resource_slot_options) = build_tuner_slot_options (
440499 options
0 commit comments