@@ -3091,7 +3091,7 @@ func (ms *MutableStateImpl) ApplyBuildIdRedirect(
30913091 ms .GetExecutionInfo ().BuildIdRedirectCounter = redirectCounter
30923092
30933093 // Re-scheduling pending workflow and activity tasks.
3094- err = ms .reschedulePendingWorkflowTask (false )
3094+ err = ms .reschedulePendingWorkflowTask ()
30953095 if err != nil {
30963096 return err
30973097 }
@@ -3105,6 +3105,16 @@ func (ms *MutableStateImpl) ApplyBuildIdRedirect(
31053105 // TODO: skip task generation also when activity is in backoff period
31063106 continue
31073107 }
3108+
3109+ // need to update stamp so the passive side regenerate the task
3110+ err := ms .UpdateActivity (ai .ScheduledEventId , func (info * persistencespb.ActivityInfo , state historyi.MutableState ) error {
3111+ info .Stamp ++
3112+ return nil
3113+ })
3114+ if err != nil {
3115+ return err
3116+ }
3117+
31083118 // we only need to resend the activities to matching, no need to update timer tasks.
31093119 err = ms .taskGenerator .GenerateActivityTasks (ai .ScheduledEventId )
31103120 if err != nil {
@@ -4992,14 +5002,16 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
49925002
49935003 // Finally, reschedule the pending workflow task if so requested.
49945004 if requestReschedulePendingWorkflowTask {
4995- return ms .reschedulePendingWorkflowTask (true )
5005+ return ms .reschedulePendingWorkflowTask ()
49965006 }
49975007 return nil
49985008}
49995009
50005010func (ms * MutableStateImpl ) updateVersioningOverride (
50015011 override * workflowpb.VersioningOverride ,
50025012) (bool , error ) {
5013+ previousEffectiveDeployment := ms .GetEffectiveDeployment ()
5014+ previousEffectiveVersioningBehavior := ms .GetEffectiveVersioningBehavior ()
50035015 var requestReschedulePendingWorkflowTask bool
50045016
50055017 if override != nil {
@@ -5063,8 +5075,6 @@ func (ms *MutableStateImpl) updateVersioningOverride(
50635075 ms .GetExecutionInfo ().WorkerDeploymentName = ""
50645076 }
50655077
5066- previousEffectiveDeployment := ms .GetEffectiveDeployment ()
5067- previousEffectiveVersioningBehavior := ms .GetEffectiveVersioningBehavior ()
50685078 if ! proto .Equal (ms .GetEffectiveDeployment (), previousEffectiveDeployment ) ||
50695079 ms .GetEffectiveVersioningBehavior () != previousEffectiveVersioningBehavior {
50705080 // TODO (carly) part 2: if safe mode, do replay test on new deployment if deployment changed, if fail, revert changes and abort
@@ -8441,7 +8451,7 @@ func (ms *MutableStateImpl) StartDeploymentTransition(deployment *deploymentpb.D
84418451 // - reschedule the pending WFT so the old one is invalided
84428452 ms .ClearStickyTaskQueue ()
84438453
8444- err := ms .reschedulePendingWorkflowTask (false )
8454+ err := ms .reschedulePendingWorkflowTask ()
84458455 if err != nil {
84468456 return err
84478457 }
@@ -8477,8 +8487,17 @@ func (ms *MutableStateImpl) reschedulePendingActivities() error {
84778487 // activity already started
84788488 continue
84798489 }
8490+
8491+ // need to update stamp so the passive side regenerate the task
8492+ err := ms .UpdateActivity (ai .ScheduledEventId , func (info * persistencespb.ActivityInfo , state historyi.MutableState ) error {
8493+ info .Stamp ++
8494+ return nil
8495+ })
8496+ if err != nil {
8497+ return err
8498+ }
84808499 // we only need to resend the activities to matching, no need to update timer tasks.
8481- err : = ms .taskGenerator .GenerateActivityTasks (ai .ScheduledEventId )
8500+ err = ms .taskGenerator .GenerateActivityTasks (ai .ScheduledEventId )
84828501 if err != nil {
84838502 return err
84848503 }
@@ -8489,7 +8508,7 @@ func (ms *MutableStateImpl) reschedulePendingActivities() error {
84898508
84908509// reschedulePendingWorkflowTask reschedules the pending WFT if it is not started yet.
84918510// The currently scheduled WFT will be rejected when attempting to start because its stamp changed.
8492- func (ms * MutableStateImpl ) reschedulePendingWorkflowTask (invalidatePendingTasks bool ) error {
8511+ func (ms * MutableStateImpl ) reschedulePendingWorkflowTask () error {
84938512 // If the WFT is started but not finished, we let it run its course
84948513 // - once it's completed, failed or timed out a new one will be scheduled.
84958514 if ! ms .HasPendingWorkflowTask () || ms .HasStartedWorkflowTask () {
@@ -8504,15 +8523,13 @@ func (ms *MutableStateImpl) reschedulePendingWorkflowTask(invalidatePendingTasks
85048523 ms .logInfo ("start transition did not reschedule pending speculative task" )
85058524 return nil
85068525 }
8526+ // Reset the attempt; forcing a non-transient workflow task to be scheduled.
8527+ ms .executionInfo .WorkflowTaskAttempt = 1
85078528
8508- if invalidatePendingTasks {
8509- // Increase the stamp ("version") to invalidate the pending non-speculative WFT.
8510- // We don't invalidate speculative WFTs because they are very latency sensitive.
8511- ms .executionInfo .WorkflowTaskStamp += 1
8512-
8513- // Reset the attempt; forcing a non-transient workflow task to be scheduled.
8514- ms .executionInfo .Attempt = 1
8515- }
8529+ // Increase the stamp ("version") to invalidate the pending non-speculative WFT.
8530+ // We don't invalidate speculative WFTs because they are very latency sensitive.
8531+ ms .executionInfo .WorkflowTaskStamp += 1
8532+ ms .workflowTaskUpdated = true
85168533
85178534 return ms .taskGenerator .GenerateScheduleWorkflowTaskTasks (pendingTask .ScheduledEventID )
85188535}
0 commit comments