Skip to content

Commit 9494ded

Browse files
gowspkane31
andauthored
Mutable state changes for unpausing a single workflow. (#8674)
## What changed? - Recording unpause event - Regenerating workflow and activity tasks - Made EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED a buffered event (since pause event is buffered) - Added unit tests (Functional tests are added along with the API changes) ## Why? This is needed to implement pause/unpause operation APIs. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduce workflow "unpaused" history event end-to-end, buffer pause/unpause consistently, and reschedule activities on unpause with tests. > > - **History Events**: > - Add `CreateWorkflowExecutionUnpausedEvent` and `HistoryBuilder.AddWorkflowExecutionUnpausedEvent`. > - Update event buffering: buffer both `WORKFLOW_EXECUTION_PAUSED` and `WORKFLOW_EXECUTION_UNPAUSED` in `historybuilder/event_store.go`. > - **Mutable State**: > - Add `AddWorkflowExecutionUnpausedEvent` and `ApplyWorkflowExecutionUnpausedEvent` to transition to `RUNNING`, clear `PauseInfo`, bump activity stamps, and (re)generate activity tasks as needed. > - Extend `interfaces.MutableState` + mocks with unpause methods. > - Rebuilder applies `WORKFLOW_EXECUTION_UNPAUSED` events. > - **Logging/Tags**: > - Add `tag.WorkflowActionWorkflowUnpaused`. > - **Tests**: > - Add unit tests for pause/unpause flow, stamps, and buffering behavior; adjust existing buffering test expectations. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f139027. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Sean Kane <[email protected]>
1 parent e31d04c commit 9494ded

File tree

10 files changed

+206
-5
lines changed

10 files changed

+206
-5
lines changed

common/log/tag/values.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ var (
1111
WorkflowActionWorkflowTerminated = workflowAction("add-workflow-terminated-event")
1212
WorkflowActionWorkflowContinueAsNew = workflowAction("add-workflow-continue-as-new-event")
1313
WorkflowActionWorkflowPaused = workflowAction("add-workflow-paused-event")
14+
WorkflowActionWorkflowUnpaused = workflowAction("add-workflow-unpaused-event")
1415

1516
// workflow cancellation / sign / update-options
1617
WorkflowActionWorkflowCancelRequested = workflowAction("add-workflow-cancel-requested-event")

service/history/historybuilder/event_factory.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,22 @@ func (b *EventFactory) CreateWorkflowExecutionPausedEvent(
10241024
return event
10251025
}
10261026

1027+
func (b *EventFactory) CreateWorkflowExecutionUnpausedEvent(
1028+
identity string,
1029+
reason string,
1030+
requestID string,
1031+
) *historypb.HistoryEvent {
1032+
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED, b.timeSource.Now())
1033+
event.Attributes = &historypb.HistoryEvent_WorkflowExecutionUnpausedEventAttributes{
1034+
WorkflowExecutionUnpausedEventAttributes: &historypb.WorkflowExecutionUnpausedEventAttributes{
1035+
Identity: identity,
1036+
Reason: reason,
1037+
RequestId: requestID,
1038+
},
1039+
}
1040+
return event
1041+
}
1042+
10271043
func (b *EventFactory) createHistoryEvent(
10281044
eventType enumspb.EventType,
10291045
time time.Time,

service/history/historybuilder/event_store.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,11 +306,10 @@ func (b *EventStore) bufferEvent(
306306
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
307307
return false
308308

309-
// Unpaused event should not be buffered since it transitions the workflow from paused to running status.
310-
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED:
311-
return false
312309
// A paused workflow event *should be* allowed to be buffered since we want to accept any inflight workflow task completion.
313-
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_PAUSED:
310+
// Since we buffer the paused event, we need to buffer unpaused event as well so that they don't go out of order.
311+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_PAUSED,
312+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED:
314313
return true
315314

316315
default:

service/history/historybuilder/history_builder.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,18 @@ func (b *HistoryBuilder) AddWorkflowExecutionPausedEvent(
288288
return event
289289
}
290290

291+
func (b *HistoryBuilder) AddWorkflowExecutionUnpausedEvent(
292+
identity string,
293+
reason string,
294+
requestID string,
295+
) *historypb.HistoryEvent {
296+
event := b.CreateWorkflowExecutionUnpausedEvent(identity, reason, requestID)
297+
// Mark the event as 'worker may ignore' so that older SDKs can safely ignore it.
298+
event.WorkerMayIgnore = true
299+
event, _ = b.add(event)
300+
return event
301+
}
302+
291303
func (b *HistoryBuilder) AddActivityTaskScheduledEvent(
292304
workflowTaskCompletedEventID int64,
293305
command *commandpb.ScheduleActivityTaskCommandAttributes,

service/history/historybuilder/history_builder_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2222,7 +2222,6 @@ func (s *historyBuilderSuite) TestBufferEvent() {
22222222
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED: true,
22232223
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW: true,
22242224
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: true,
2225-
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED: true,
22262225
}
22272226

22282227
// workflow task events will be assign event ID immediately

service/history/interfaces/mutable_state.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ type (
215215
IsWorkflowExecutionRunning() bool
216216
AddWorkflowExecutionPausedEvent(identity string, reason string, requestID string) (*historypb.HistoryEvent, error)
217217
ApplyWorkflowExecutionPausedEvent(event *historypb.HistoryEvent) error
218+
AddWorkflowExecutionUnpausedEvent(identity string, reason string, requestID string) (*historypb.HistoryEvent, error)
219+
ApplyWorkflowExecutionUnpausedEvent(event *historypb.HistoryEvent) error
218220
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
219221
IsWorkflowPendingOnWorkflowTaskBackoff() bool
220222
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)

service/history/interfaces/mutable_state_mock.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/mutable_state_impl.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2866,6 +2866,66 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionPausedEvent(event *historypb.H
28662866
return nil
28672867
}
28682868

2869+
func (ms *MutableStateImpl) AddWorkflowExecutionUnpausedEvent(
2870+
identity string,
2871+
reason string,
2872+
requestID string,
2873+
) (*historypb.HistoryEvent, error) {
2874+
opTag := tag.WorkflowActionWorkflowUnpaused
2875+
if err := ms.checkMutability(opTag); err != nil {
2876+
return nil, err
2877+
}
2878+
event := ms.hBuilder.AddWorkflowExecutionUnpausedEvent(identity, reason, requestID)
2879+
if err := ms.ApplyWorkflowExecutionUnpausedEvent(event); err != nil {
2880+
return nil, err
2881+
}
2882+
return event, nil
2883+
}
2884+
2885+
// ApplyWorkflowExecutionUnpausedEvent applies the unpaused event to the mutable state. It updates the workflow execution status to running and clears the pause info.
2886+
func (ms *MutableStateImpl) ApplyWorkflowExecutionUnpausedEvent(event *historypb.HistoryEvent) error {
2887+
// Update workflow status.
2888+
if _, err := ms.UpdateWorkflowStateStatus(ms.executionState.GetState(), enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING); err != nil {
2889+
return err
2890+
}
2891+
2892+
// save pauseInfoSize before clearing so that we can adjust approximate size later before returning success
2893+
pauseInfoSize := 0
2894+
if ms.executionInfo.PauseInfo != nil {
2895+
pauseInfoSize = ms.GetExecutionInfo().GetPauseInfo().Size()
2896+
// Clear pause info in mutable state.
2897+
ms.executionInfo.PauseInfo = nil
2898+
}
2899+
2900+
// Reschedule any pending activities
2901+
// Note: workflow task is scheduled in the unpause API. So no need to schedule it here.
2902+
for _, ai := range ms.GetPendingActivityInfos() {
2903+
// Bump activity stamp to force replication so that the passive cluster can recreate the activity task.
2904+
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
2905+
activityInfo.Stamp = activityInfo.Stamp + 1
2906+
return nil
2907+
}); err != nil {
2908+
return err
2909+
}
2910+
2911+
// Check activity scheduled time and generate activity retry task if scheduled time is in the future.
2912+
if ai.GetScheduledTime().AsTime().After(ms.timeSource.Now().UTC()) {
2913+
if err := ms.taskGenerator.GenerateActivityRetryTasks(ai); err != nil {
2914+
return err
2915+
}
2916+
} else {
2917+
// Generate activity task to resend the activity to matching immediately.
2918+
if err := ms.taskGenerator.GenerateActivityTasks(ai.ScheduledEventId); err != nil {
2919+
return err
2920+
}
2921+
}
2922+
}
2923+
2924+
// Update approximate size of the mutable state.
2925+
ms.approximateSize -= pauseInfoSize
2926+
return nil
2927+
}
2928+
28692929
func (ms *MutableStateImpl) addCompletionCallbacks(
28702930
event *historypb.HistoryEvent,
28712931
requestID string,

service/history/workflow/mutable_state_impl_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,6 +1754,85 @@ func (s *mutableStateSuite) TestAddWorkflowExecutionPausedEvent() {
17541754
s.True(pausedEvent.GetWorkerMayIgnore())
17551755
}
17561756

1757+
func (s *mutableStateSuite) TestAddWorkflowExecutionUnpausedEvent() {
1758+
s.SetupSubTest()
1759+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
1760+
1761+
tq := &taskqueuepb.TaskQueue{Name: "tq"}
1762+
s.createVersionedMutableStateWithCompletedWFT(tq)
1763+
1764+
// Complete another WFT to obtain a valid completed event id for scheduling an activity.
1765+
wft, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
1766+
s.NoError(err)
1767+
_, wft, err = s.mutableState.AddWorkflowTaskStartedEvent(
1768+
wft.ScheduledEventID,
1769+
"",
1770+
tq,
1771+
"",
1772+
worker_versioning.StampFromBuildId("b1"),
1773+
nil,
1774+
nil,
1775+
false,
1776+
)
1777+
s.NoError(err)
1778+
completedEvent, err := s.mutableState.AddWorkflowTaskCompletedEvent(
1779+
wft,
1780+
&workflowservice.RespondWorkflowTaskCompletedRequest{},
1781+
workflowTaskCompletionLimits,
1782+
)
1783+
s.NoError(err)
1784+
1785+
// Schedule an activity (pending) using the completed WFT event id.
1786+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
1787+
completedEvent.GetEventId(),
1788+
&commandpb.ScheduleActivityTaskCommandAttributes{
1789+
ActivityId: "act-1",
1790+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
1791+
TaskQueue: tq,
1792+
},
1793+
false,
1794+
)
1795+
s.NoError(err)
1796+
// Create a pending workflow task.
1797+
pendingWFT, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
1798+
s.NoError(err)
1799+
1800+
// Pause first to simulate paused workflow state.
1801+
_, err = s.mutableState.AddWorkflowExecutionPausedEvent("tester", "reason", uuid.New())
1802+
s.NoError(err)
1803+
1804+
// Capture stamps after pause.
1805+
pausedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
1806+
s.True(ok)
1807+
pausedActivityStamp := pausedActivityInfo.Stamp
1808+
pausedWFT := s.mutableState.GetPendingWorkflowTask()
1809+
s.NotNil(pausedWFT)
1810+
pausedWFTStamp := pausedWFT.Stamp
1811+
1812+
// Unpause and verify.
1813+
unpausedEvent, err := s.mutableState.AddWorkflowExecutionUnpausedEvent("tester", "reason", uuid.New())
1814+
s.NoError(err)
1815+
1816+
// PauseInfo should be cleared and status should be RUNNING.
1817+
s.Nil(s.mutableState.executionInfo.PauseInfo)
1818+
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, s.mutableState.executionState.Status)
1819+
1820+
// Stamps should be incremented again (only for activities) on unpause.
1821+
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
1822+
s.True(ok)
1823+
s.Greater(updatedActivityInfo.Stamp, pausedActivityStamp)
1824+
1825+
currentWFT := s.mutableState.GetPendingWorkflowTask()
1826+
s.NotNil(currentWFT)
1827+
s.Equal(currentWFT.Stamp, pausedWFTStamp) // workflow task stamp should not change between pause and unpause.
1828+
1829+
// assert the event is marked as 'worker may ignore' so that older SDKs can safely ignore it.
1830+
s.True(unpausedEvent.GetWorkerMayIgnore())
1831+
1832+
// Ensure the pending workflow task we created earlier still exists (no unexpected removal).
1833+
s.Equal(pendingWFT.ScheduledEventID, currentWFT.ScheduledEventID)
1834+
}
1835+
17571836
func (s *mutableStateSuite) TestPauseWorkflowExecution_FailStateValidation() {
17581837
s.SetupSubTest()
17591838
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

service/history/workflow/mutable_state_rebuilder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,10 @@ func (b *MutableStateRebuilderImpl) applyEvents(
666666
if err := b.mutableState.ApplyWorkflowExecutionPausedEvent(event); err != nil {
667667
return nil, err
668668
}
669+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UNPAUSED:
670+
if err := b.mutableState.ApplyWorkflowExecutionUnpausedEvent(event); err != nil {
671+
return nil, err
672+
}
669673

670674
default:
671675
def, ok := b.shard.StateMachineRegistry().EventDefinition(event.GetEventType())

0 commit comments

Comments
 (0)