Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func Invoke(
}, nil
}
}
// If workflow is paused, return query rejected with PAUSED status.
if mutableStateStatus == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
return &historyservice.QueryWorkflowResponse{
Response: &workflowservice.QueryWorkflowResponse{
QueryRejected: &querypb.QueryRejected{
Status: enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED,
},
},
}, nil
}

mutableState := workflowLease.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() && !mutableState.HasCompletedAnyWorkflowTask() {
Expand Down
82 changes: 82 additions & 0 deletions tests/pause_workflow_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -113,6 +115,86 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecution() {
}, 5*time.Second, 200*time.Millisecond)
}

func (s *PauseWorkflowExecutionSuite) TestQueryWorkflowWhenPaused() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

s.Worker().RegisterWorkflow(s.workflowFn)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()),
TaskQueue: s.TaskQueue(),
}

workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowFn)
s.NoError(err)
workflowID := workflowRun.GetID()
runID := workflowRun.GetRunID()

s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus())
}, 5*time.Second, 100*time.Millisecond)

// Pause the workflow.
pauseRequest := &workflowservice.PauseWorkflowExecutionRequest{
Namespace: s.Namespace().String(),
WorkflowId: workflowID,
RunId: runID,
Identity: s.pauseIdentity,
Reason: s.pauseReason,
RequestId: uuid.New(),
}
pauseResp, err := s.FrontendClient().PauseWorkflowExecution(ctx, pauseRequest)
s.NoError(err)
s.NotNil(pauseResp)

// Wait until paused.
s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus())
if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil {
require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity())
require.Equal(t, s.pauseReason, pauseInfo.GetReason())
}
}, 5*time.Second, 200*time.Millisecond)

// Issue a query to the paused workflow. It should return QueryRejected with WORKFLOW_EXECUTION_STATUS_PAUSED status.
queryReq := &workflowservice.QueryWorkflowRequest{
Namespace: s.Namespace().String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Query: &querypb.WorkflowQuery{
QueryType: "__stack_trace",
},
}
queryResp, err := s.FrontendClient().QueryWorkflow(ctx, queryReq)
s.NoError(err)
s.NotNil(queryResp)
s.NotNil(queryResp.GetQueryRejected())
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, queryResp.GetQueryRejected().GetStatus())

// Complete the workflow to finish the test.
err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal")
s.NoError(err)

s.EventuallyWithT(func(t *assert.CollectT) {
desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID)
require.NoError(t, err)
info := desc.GetWorkflowExecutionInfo()
require.NotNil(t, info)
require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, info.GetStatus())
}, 5*time.Second, 200*time.Millisecond)
}

// TestPauseWorkflowExecutionRequestValidation tests that pause workflow execution request validation. We don't really need a valid workflow to test this.
// - fails when the identity is too long.
// - fails when the reason is too long.
Expand Down
Loading