diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index c5974f6ba2a..8b1601bb69c 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -505,10 +505,19 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity( eagerStartActivity := attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) && (!versioningUsed || attr.UseWorkflowBuildId) + // if the workflow is paused, we bypass activity task generation and also prevent eager activity execution. + bypassActivityTaskGeneration := eagerStartActivity + if handler.mutableState.GetExecutionState().Status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED { + bypassActivityTaskGeneration = true + eagerStartActivity = false + } else { // if not we bypass activity task generation if eager start activity is requested. + bypassActivityTaskGeneration = eagerStartActivity + } + event, _, err := handler.mutableState.AddActivityTaskScheduledEvent( handler.workflowTaskCompletedID, attr, - eagerStartActivity, + bypassActivityTaskGeneration, ) if err != nil { return nil, nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err) diff --git a/service/history/api/update_workflow_util.go b/service/history/api/update_workflow_util.go index ccb6ce135f0..9759ad82e7c 100644 --- a/service/history/api/update_workflow_util.go +++ b/service/history/api/update_workflow_util.go @@ -3,6 +3,7 @@ package api import ( "context" + enumspb "go.temporal.io/api/enums/v1" clockspb "go.temporal.io/server/api/clock/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/definition" @@ -78,8 +79,8 @@ func UpdateWorkflowWithNew( mutableState := workflowLease.GetMutableState() if postActions.CreateWorkflowTask { - // Create a transfer task to schedule a workflow task - if !mutableState.HasPendingWorkflowTask() { + // Create a transfer task to schedule a workflow task only if the workflow is in running status and there is no pending workflow task. + if !mutableState.HasPendingWorkflowTask() && mutableState.GetExecutionState().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { if _, err := mutableState.AddWorkflowTaskScheduledEvent( false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL, diff --git a/tests/pause_workflow_execution_test.go b/tests/pause_workflow_execution_test.go index 4223f289ec9..f6d38ef1e2d 100644 --- a/tests/pause_workflow_execution_test.go +++ b/tests/pause_workflow_execution_test.go @@ -3,6 +3,7 @@ package tests import ( "context" "strings" + "sync" "testing" "time" @@ -26,7 +27,12 @@ type PauseWorkflowExecutionSuite struct { pauseIdentity string pauseReason string - workflowFn func(ctx workflow.Context) (string, error) + workflowFn func(ctx workflow.Context) (string, error) + childWorkflowFn func(ctx workflow.Context) (string, error) + activityFn func(ctx context.Context) (string, error) + + activityCompletedCh chan struct{} + activityCompletedOnce sync.Once } func TestPauseWorkflowExecutionSuite(t *testing.T) { @@ -41,12 +47,41 @@ func (s *PauseWorkflowExecutionSuite) SetupTest() { s.testEndSignal = "test-end" s.pauseIdentity = "functional-test" s.pauseReason = "pausing workflow for acceptance test" + s.activityCompletedCh = make(chan struct{}, 1) s.workflowFn = func(ctx workflow.Context) (string, error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + ScheduleToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var activityResult string + if err := workflow.ExecuteActivity(ctx, s.activityFn).Get(ctx, &activityResult); err != nil { + return "", err + } + + var childResult string + if err := workflow.ExecuteChildWorkflow(ctx, s.childWorkflowFn).Get(ctx, &childResult); err != nil { + return "", err + } + signalCh := workflow.GetSignalChannel(ctx, s.testEndSignal) var signalPayload string signalCh.Receive(ctx, &signalPayload) - return signalPayload, nil + return signalPayload + activityResult + childResult, nil + } + + s.childWorkflowFn = func(ctx workflow.Context) (string, error) { + return "child-workflow", nil + } + + s.activityFn = func(ctx context.Context) (string, error) { + s.activityCompletedOnce.Do(func() { + // blocks until the test case unblocks the activity. + <-s.activityCompletedCh + }) + return "activity", nil } } @@ -56,6 +91,8 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() { defer cancel() s.Worker().RegisterWorkflow(s.workflowFn) + s.Worker().RegisterWorkflow(s.childWorkflowFn) + s.Worker().RegisterActivity(s.activityFn) workflowOptions := sdkclient.StartWorkflowOptions{ ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()), @@ -88,6 +125,27 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() { s.NoError(err) s.NotNil(pauseResp) + // unblock the activity to complete. + s.activityCompletedCh <- struct{}{} + + // ensure that the workflow is paused even when the activity is completed. + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) + require.NoError(t, err) + info := desc.GetWorkflowExecutionInfo() + require.NotNil(t, info) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus()) + if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil { + require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity()) + require.Equal(t, s.pauseReason, pauseInfo.GetReason()) + } + }, 5*time.Second, 200*time.Millisecond) + + // Send unblock signal to the workflow to complete and assert that the workflow stays paused. + err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "signal to complete the workflow") + s.NoError(err) + + time.Sleep(2 * time.Second) // wait 2 seconds to give enough time record the signal. s.EventuallyWithT(func(t *assert.CollectT) { desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) require.NoError(t, err) @@ -113,21 +171,7 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() { s.NoError(err) s.NotNil(unpauseResp) - // Wait until unpaused (running again). - s.EventuallyWithT(func(t *assert.CollectT) { - desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) - require.NoError(t, err) - info := desc.GetWorkflowExecutionInfo() - require.NotNil(t, info) - require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus()) - require.Nil(t, desc.GetWorkflowExtendedInfo().GetPauseInfo()) - }, 5*time.Second, 200*time.Millisecond) - - // TODO: currently pause workflow execution does not intercept workflow creation. Fix the reset of this test when that is implemented. - // For now sending this signal will complete the workflow and finish the test. - err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal") - s.NoError(err) - + // assert that the workflow completes now. s.EventuallyWithT(func(t *assert.CollectT) { desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) require.NoError(t, err) @@ -281,6 +325,8 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused() defer cancel() s.Worker().RegisterWorkflow(s.workflowFn) + s.Worker().RegisterWorkflow(s.childWorkflowFn) + s.Worker().RegisterActivity(s.activityFn) workflowOptions := sdkclient.StartWorkflowOptions{ ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()), @@ -335,6 +381,27 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused() s.NotNil(failedPreconditionErr) s.Contains(failedPreconditionErr.Error(), "workflow is already paused.") + unpauseRequest := &workflowservice.UnpauseWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + WorkflowId: workflowID, + RunId: runID, + Identity: s.pauseIdentity, + Reason: "cleanup after paused workflow test", + RequestId: uuid.New(), + } + unpauseResp, err := s.FrontendClient().UnpauseWorkflowExecution(ctx, unpauseRequest) + s.NoError(err) + s.NotNil(unpauseResp) + + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) + require.NoError(t, err) + info := desc.GetWorkflowExecutionInfo() + require.NotNil(t, info) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus()) + require.Nil(t, desc.GetWorkflowExtendedInfo().GetPauseInfo()) + }, 5*time.Second, 200*time.Millisecond) + // For now sending this signal will complete the workflow and finish the test. err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal") s.NoError(err)