77 commandpb "go.temporal.io/api/command/v1"
88 commonpb "go.temporal.io/api/common/v1"
99 enumspb "go.temporal.io/api/enums/v1"
10+ historypb "go.temporal.io/api/history/v1"
1011 "go.temporal.io/api/serviceerror"
1112 taskqueuepb "go.temporal.io/api/taskqueue/v1"
1213 "go.temporal.io/api/workflowservice/v1"
@@ -27,6 +28,16 @@ func Invoke(
2728 shardContext historyi.ShardContext ,
2829 workflowConsistencyChecker api.WorkflowConsistencyChecker ,
2930) (resp * historyservice.UpdateActivityOptionsResponse , retError error ) {
31+ updateRequest := request .GetUpdateRequest ()
32+
33+ mask := updateRequest .GetUpdateMask ()
34+ if mask != nil && updateRequest .RestoreOriginal {
35+ updateFields := util .ParseFieldMask (mask )
36+ if len (updateFields ) != 0 {
37+ return nil , serviceerror .NewInvalidArgument ("Both UpdateMask and RestoreOriginal are provided" )
38+ }
39+ }
40+
3041 validator := api .NewCommandAttrValidator (
3142 shardContext .GetNamespaceRegistry (),
3243 shardContext .GetConfig (),
@@ -40,13 +51,18 @@ func Invoke(
4051 nil ,
4152 definition .NewWorkflowKey (
4253 request .NamespaceId ,
43- request . GetUpdateRequest () .GetExecution ().GetWorkflowId (),
44- request . GetUpdateRequest () .GetExecution ().GetRunId (),
54+ updateRequest .GetExecution ().GetWorkflowId (),
55+ updateRequest .GetExecution ().GetRunId (),
4556 ),
4657 func (workflowLease api.WorkflowLease ) (* api.UpdateWorkflowAction , error ) {
4758 mutableState := workflowLease .GetMutableState ()
4859 var err error
49- response , err = processActivityOptionsRequest (validator , mutableState , request )
60+ if updateRequest .RestoreOriginal {
61+ response , err = restoreOriginalOptions (ctx , mutableState , updateRequest )
62+ } else {
63+ response , err = processActivityOptionsRequest (validator , mutableState , updateRequest , request .GetNamespaceId ())
64+ }
65+
5066 if err != nil {
5167 return nil , err
5268 }
@@ -70,29 +86,18 @@ func Invoke(
7086func processActivityOptionsRequest (
7187 validator * api.CommandAttrValidator ,
7288 mutableState historyi.MutableState ,
73- request * historyservice.UpdateActivityOptionsRequest ,
89+ updateRequest * workflowservice.UpdateActivityOptionsRequest ,
90+ namespaceID string ,
7491) (* historyservice.UpdateActivityOptionsResponse , error ) {
7592 if ! mutableState .IsWorkflowExecutionRunning () {
7693 return nil , consts .ErrWorkflowCompleted
7794 }
78- updateRequest := request .GetUpdateRequest ()
7995 mergeFrom := updateRequest .GetActivityOptions ()
8096 if mergeFrom == nil {
8197 return nil , serviceerror .NewInvalidArgument ("ActivityOptions are not provided" )
8298 }
8399
84- var activityIDs []string
85- switch a := updateRequest .GetActivity ().(type ) {
86- case * workflowservice.UpdateActivityOptionsRequest_Id :
87- activityIDs = append (activityIDs , a .Id )
88- case * workflowservice.UpdateActivityOptionsRequest_Type :
89- activityType := a .Type
90- for _ , ai := range mutableState .GetPendingActivityInfos () {
91- if ai .ActivityType .Name == activityType {
92- activityIDs = append (activityIDs , ai .ActivityId )
93- }
94- }
95- }
100+ activityIDs := getActivityIDs (updateRequest , mutableState )
96101
97102 if len (activityIDs ) == 0 {
98103 return nil , consts .ErrActivityNotFound
@@ -114,7 +119,7 @@ func processActivityOptionsRequest(
114119 return nil , consts .ErrActivityNotFound
115120 }
116121
117- if adjustedOptions , err = updateActivityOptions (validator , mutableState , request , ai , mergeFrom , updateFields ); err != nil {
122+ if adjustedOptions , err = processActivityOptionsUpdate (validator , mutableState , namespaceID , ai , mergeFrom , updateFields ); err != nil {
118123 return nil , err
119124 }
120125 }
@@ -126,10 +131,10 @@ func processActivityOptionsRequest(
126131 return response , nil
127132}
128133
129- func updateActivityOptions (
134+ func processActivityOptionsUpdate (
130135 validator * api.CommandAttrValidator ,
131136 mutableState historyi.MutableState ,
132- request * historyservice. UpdateActivityOptionsRequest ,
137+ namespaceID string ,
133138 ai * persistencespb.ActivityInfo ,
134139 mergeFrom * activitypb.ActivityOptions ,
135140 updateFields map [string ]struct {},
@@ -152,60 +157,20 @@ func updateActivityOptions(
152157 }
153158
154159 // update activity options
155- if err := applyActivityOptions (mergeInto , mergeFrom , updateFields ); err != nil {
160+ if err := mergeActivityOptions (mergeInto , mergeFrom , updateFields ); err != nil {
156161 return nil , err
157162 }
158163
159164 // validate the updated options
160- adjustedOptions , err := adjustActivityOptions (validator , request . NamespaceId , ai .ActivityId , ai .ActivityType , mergeInto )
165+ adjustedOptions , err := adjustActivityOptions (validator , namespaceID , ai .ActivityId , ai .ActivityType , mergeInto )
161166 if err != nil {
162167 return nil , err
163168 }
164169
165- if err = mutableState .UpdateActivity (ai .ScheduledEventId , func (activityInfo * persistencespb.ActivityInfo , _ historyi.MutableState ) error {
166- // update activity info with new options
167- activityInfo .TaskQueue = adjustedOptions .TaskQueue .Name
168- activityInfo .ScheduleToCloseTimeout = adjustedOptions .ScheduleToCloseTimeout
169- activityInfo .ScheduleToStartTimeout = adjustedOptions .ScheduleToStartTimeout
170- activityInfo .StartToCloseTimeout = adjustedOptions .StartToCloseTimeout
171- activityInfo .HeartbeatTimeout = adjustedOptions .HeartbeatTimeout
172- activityInfo .RetryMaximumInterval = adjustedOptions .RetryPolicy .MaximumInterval
173- activityInfo .RetryBackoffCoefficient = adjustedOptions .RetryPolicy .BackoffCoefficient
174- activityInfo .RetryInitialInterval = adjustedOptions .RetryPolicy .InitialInterval
175- activityInfo .RetryMaximumAttempts = adjustedOptions .RetryPolicy .MaximumAttempts
176-
177- // move forward activity version
178- activityInfo .Stamp ++
179-
180- // invalidate timers
181- activityInfo .TimerTaskStatus = workflow .TimerTaskStatusNone
182- return nil
183- }); err != nil {
184- return nil , err
185- }
186-
187- if workflow .GetActivityState (ai ) == enumspb .PENDING_ACTIVITY_STATE_SCHEDULED {
188- // in this case we always want to generate a new retry task
189-
190- // two options - activity can be in backoff, or scheduled (waiting to be started)
191- // if activity in backoff
192- // in this case there is already old retry task
193- // it will be ignored because of stamp mismatch
194- // if activity is scheduled and waiting to be started
195- // eventually matching service will call history service (recordActivityTaskStarted)
196- // history service will return error based on stamp. Task will be dropped
197-
198- nextScheduledTime := workflow .GetNextScheduledTime (ai )
199- err = mutableState .RegenerateActivityRetryTask (ai , nextScheduledTime )
200- if err != nil {
201- return nil , err
202- }
203- }
204-
205- return adjustedOptions , nil
170+ return updateActivityOptions (mutableState , ai , adjustedOptions )
206171}
207172
208- func applyActivityOptions (
173+ func mergeActivityOptions (
209174 mergeInto * activitypb.ActivityOptions ,
210175 mergeFrom * activitypb.ActivityOptions ,
211176 updateFields map [string ]struct {},
@@ -300,3 +265,126 @@ func adjustActivityOptions(
300265
301266 return ao , nil
302267}
268+
269+ func getActivityIDs (updateRequest * workflowservice.UpdateActivityOptionsRequest , ms historyi.MutableState ) []string {
270+ var activityIDs []string
271+ switch a := updateRequest .GetActivity ().(type ) {
272+ case * workflowservice.UpdateActivityOptionsRequest_Id :
273+ activityIDs = append (activityIDs , a .Id )
274+ case * workflowservice.UpdateActivityOptionsRequest_Type :
275+ activityType := a .Type
276+ for _ , ai := range ms .GetPendingActivityInfos () {
277+ if ai .ActivityType .Name == activityType {
278+ activityIDs = append (activityIDs , ai .ActivityId )
279+ }
280+ }
281+ }
282+ return activityIDs
283+ }
284+
285+ func updateActivityOptions (
286+ ms historyi.MutableState ,
287+ ai * persistencespb.ActivityInfo ,
288+ activityOptions * activitypb.ActivityOptions ,
289+ ) (* activitypb.ActivityOptions , error ) {
290+ var err error
291+ if err = ms .UpdateActivity (ai .ScheduledEventId , func (activityInfo * persistencespb.ActivityInfo , _ historyi.MutableState ) error {
292+ // update activity info with new options
293+ activityInfo .TaskQueue = activityOptions .TaskQueue .Name
294+ activityInfo .ScheduleToCloseTimeout = activityOptions .ScheduleToCloseTimeout
295+ activityInfo .ScheduleToStartTimeout = activityOptions .ScheduleToStartTimeout
296+ activityInfo .StartToCloseTimeout = activityOptions .StartToCloseTimeout
297+ activityInfo .HeartbeatTimeout = activityOptions .HeartbeatTimeout
298+ activityInfo .RetryMaximumInterval = activityOptions .RetryPolicy .MaximumInterval
299+ activityInfo .RetryBackoffCoefficient = activityOptions .RetryPolicy .BackoffCoefficient
300+ activityInfo .RetryInitialInterval = activityOptions .RetryPolicy .InitialInterval
301+ activityInfo .RetryMaximumAttempts = activityOptions .RetryPolicy .MaximumAttempts
302+
303+ // move forward activity version
304+ activityInfo .Stamp ++
305+
306+ // invalidate timers
307+ activityInfo .TimerTaskStatus = workflow .TimerTaskStatusNone
308+ return nil
309+ }); err != nil {
310+ return nil , err
311+ }
312+
313+ if workflow .GetActivityState (ai ) == enumspb .PENDING_ACTIVITY_STATE_SCHEDULED {
314+ // in this case we always want to generate a new retry task
315+
316+ // two options - activity can be in backoff, or scheduled (waiting to be started)
317+ // if activity in backoff
318+ // in this case there is already old retry task
319+ // it will be ignored because of stamp mismatch
320+ // if activity is scheduled and waiting to be started
321+ // eventually matching service will call history service (recordActivityTaskStarted)
322+ // history service will return error based on stamp. Task will be dropped
323+
324+ nextScheduledTime := workflow .GetNextScheduledTime (ai )
325+ err = ms .RegenerateActivityRetryTask (ai , nextScheduledTime )
326+ if err != nil {
327+ return nil , err
328+ }
329+ }
330+
331+ return activityOptions , nil
332+
333+ }
334+
335+ func restoreOriginalOptions (
336+ ctx context.Context ,
337+ ms historyi.MutableState ,
338+ updateRequest * workflowservice.UpdateActivityOptionsRequest ,
339+ ) (* historyservice.UpdateActivityOptionsResponse , error ) {
340+
341+ activityIDs := getActivityIDs (updateRequest , ms )
342+
343+ if len (activityIDs ) == 0 {
344+ return nil , consts .ErrActivityNotFound
345+ }
346+
347+ var updatedOptions * activitypb.ActivityOptions
348+
349+ for _ , activityId := range activityIDs {
350+ ai , activityFound := ms .GetActivityByActivityID (activityId )
351+
352+ if ! activityFound {
353+ return nil , consts .ErrActivityNotFound
354+ }
355+
356+ event , err := ms .GetActivityScheduledEvent (ctx , ai .ScheduledEventId )
357+ if err != nil {
358+ return nil , err
359+ }
360+ attrs , ok := event .Attributes .(* historypb.HistoryEvent_ActivityTaskScheduledEventAttributes )
361+ if ! ok {
362+ return nil , serviceerror .NewInvalidArgument ("ActivityTaskScheduledEvent is invalid" )
363+ }
364+ if attrs == nil || attrs .ActivityTaskScheduledEventAttributes == nil {
365+ return nil , serviceerror .NewInvalidArgument ("ActivityTaskScheduledEvent is incomplete" )
366+ }
367+
368+ originalOptions := attrs .ActivityTaskScheduledEventAttributes
369+
370+ activityOptions := & activitypb.ActivityOptions {
371+ TaskQueue : & taskqueuepb.TaskQueue {
372+ Name : originalOptions .TaskQueue .Name ,
373+ },
374+ ScheduleToCloseTimeout : originalOptions .ScheduleToCloseTimeout ,
375+ ScheduleToStartTimeout : originalOptions .ScheduleToStartTimeout ,
376+ StartToCloseTimeout : originalOptions .StartToCloseTimeout ,
377+ HeartbeatTimeout : originalOptions .HeartbeatTimeout ,
378+ RetryPolicy : originalOptions .RetryPolicy ,
379+ }
380+
381+ if updatedOptions , err = updateActivityOptions (ms , ai , activityOptions ); err != nil {
382+ return nil , err
383+ }
384+
385+ }
386+
387+ return & historyservice.UpdateActivityOptionsResponse {
388+ ActivityOptions : updatedOptions ,
389+ }, nil
390+ }
0 commit comments