Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,19 @@ func (handler *workflowTaskCompletedHandler) handleCommandScheduleActivity(
eagerStartActivity := attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) &&
(!versioningUsed || attr.UseWorkflowBuildId)

// if the workflow is paused, we bypass activity task generation and also prevent eager activity execution.
bypassActivityTaskGeneration := eagerStartActivity
if handler.mutableState.GetExecutionState().Status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
bypassActivityTaskGeneration = true
eagerStartActivity = false
} else { // if not we bypass activity task generation if eager start activity is requested.
bypassActivityTaskGeneration = eagerStartActivity
}
Comment on lines +513 to +515
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like the else part is a duplication of L509.


event, _, err := handler.mutableState.AddActivityTaskScheduledEvent(
handler.workflowTaskCompletedID,
attr,
eagerStartActivity,
bypassActivityTaskGeneration,
)
if err != nil {
return nil, nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)
Expand Down
5 changes: 3 additions & 2 deletions service/history/api/update_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
clockspb "go.temporal.io/server/api/clock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -78,8 +79,8 @@ func UpdateWorkflowWithNew(

mutableState := workflowLease.GetMutableState()
if postActions.CreateWorkflowTask {
// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() {
// Create a transfer task to schedule a workflow task only if the workflow is in running status and there is no pending workflow task.
if !mutableState.HasPendingWorkflowTask() && mutableState.GetExecutionState().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think workflow tasks scheduled on the task processing side goes through this code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm.. maybe intercepting in the task generator is the better? Specifically in TaskGeneratorImpl.GenerateScheduleWorkflowTaskTasks().

if _, err := mutableState.AddWorkflowTaskScheduledEvent(
false,
enumsspb.WORKFLOW_TASK_TYPE_NORMAL,
Expand Down
101 changes: 84 additions & 17 deletions tests/pause_workflow_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"context"
"strings"
"sync"
"testing"
"time"

Expand All @@ -26,7 +27,12 @@ type PauseWorkflowExecutionSuite struct {
pauseIdentity string
pauseReason string

workflowFn func(ctx workflow.Context) (string, error)
workflowFn func(ctx workflow.Context) (string, error)
childWorkflowFn func(ctx workflow.Context) (string, error)
activityFn func(ctx context.Context) (string, error)

activityCompletedCh chan struct{}
activityCompletedOnce sync.Once
}

func TestPauseWorkflowExecutionSuite(t *testing.T) {
Expand All @@ -41,12 +47,41 @@ func (s *PauseWorkflowExecutionSuite) SetupTest() {
s.testEndSignal = "test-end"
s.pauseIdentity = "functional-test"
s.pauseReason = "pausing workflow for acceptance test"
s.activityCompletedCh = make(chan struct{}, 1)

s.workflowFn = func(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Second,
ScheduleToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var activityResult string
if err := workflow.ExecuteActivity(ctx, s.activityFn).Get(ctx, &activityResult); err != nil {
return "", err
}

var childResult string
if err := workflow.ExecuteChildWorkflow(ctx, s.childWorkflowFn).Get(ctx, &childResult); err != nil {
return "", err
}

signalCh := workflow.GetSignalChannel(ctx, s.testEndSignal)
var signalPayload string
signalCh.Receive(ctx, &signalPayload)
return signalPayload, nil
return signalPayload + activityResult + childResult, nil
}

s.childWorkflowFn = func(ctx workflow.Context) (string, error) {
return "child-workflow", nil
}

s.activityFn = func(ctx context.Context) (string, error) {
s.activityCompletedOnce.Do(func() {
// blocks until the test case unblocks the activity.
<-s.activityCompletedCh
})
return "activity", nil
}
}

Expand All @@ -56,6 +91,8 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() {
defer cancel()

s.Worker().RegisterWorkflow(s.workflowFn)
s.Worker().RegisterWorkflow(s.childWorkflowFn)
s.Worker().RegisterActivity(s.activityFn)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
Expand Down Expand Up @@ -88,6 +125,27 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() {
s.NoError(err)
s.NotNil(pauseResp)

// unblock the activity to complete.
s.activityCompletedCh <- struct{}{}

// ensure that the workflow is paused even when the activity is completed.
s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus())
if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil {
require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity())
require.Equal(t, s.pauseReason, pauseInfo.GetReason())
}
}, 5*time.Second, 200*time.Millisecond)

// Send unblock signal to the workflow to complete and assert that the workflow stays paused.
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "signal to complete the workflow")
s.NoError(err)

time.Sleep(2 * time.Second) // wait 2 seconds to give enough time record the signal.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the sleep required with the s.EventuallyWithT below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a failure in tests that needs to be fixed but lgtm

s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
Expand All @@ -113,21 +171,7 @@ func (s *PauseWorkflowExecutionSuite) TestPauseUnpauseWorkflowExecution() {
s.NoError(err)
s.NotNil(unpauseResp)

// Wait until unpaused (running again).
s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus())
require.Nil(t, desc.GetWorkflowExtendedInfo().GetPauseInfo())
}, 5*time.Second, 200*time.Millisecond)

// TODO: currently pause workflow execution does not intercept workflow creation. Fix the reset of this test when that is implemented.
// For now sending this signal will complete the workflow and finish the test.
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal")
s.NoError(err)

// assert that the workflow completes now.
s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
Expand Down Expand Up @@ -281,6 +325,8 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused()
defer cancel()

s.Worker().RegisterWorkflow(s.workflowFn)
s.Worker().RegisterWorkflow(s.childWorkflowFn)
s.Worker().RegisterActivity(s.activityFn)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
Expand Down Expand Up @@ -335,6 +381,27 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecutionAlreadyPaused()
s.NotNil(failedPreconditionErr)
s.Contains(failedPreconditionErr.Error(), "workflow is already paused.")

unpauseRequest := &workflowservice.UnpauseWorkflowExecutionRequest{
Namespace: s.Namespace().String(),
WorkflowId: workflowID,
RunId: runID,
Identity: s.pauseIdentity,
Reason: "cleanup after paused workflow test",
RequestId: uuid.New(),
}
unpauseResp, err := s.FrontendClient().UnpauseWorkflowExecution(ctx, unpauseRequest)
s.NoError(err)
s.NotNil(unpauseResp)

s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus())
require.Nil(t, desc.GetWorkflowExtendedInfo().GetPauseInfo())
}, 5*time.Second, 200*time.Millisecond)

// For now sending this signal will complete the workflow and finish the test.
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal")
s.NoError(err)
Expand Down
Loading