diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index 42d352c9644..5679bc295f6 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -96,6 +96,16 @@ func Invoke( }, nil } } + // If workflow is paused, return query rejected with PAUSED status. + if mutableStateStatus == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED { + return &historyservice.QueryWorkflowResponse{ + Response: &workflowservice.QueryWorkflowResponse{ + QueryRejected: &querypb.QueryRejected{ + Status: enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, + }, + }, + }, nil + } mutableState := workflowLease.GetMutableState() if !mutableState.IsWorkflowExecutionRunning() && !mutableState.HasCompletedAnyWorkflowTask() { diff --git a/tests/pause_workflow_execution_test.go b/tests/pause_workflow_execution_test.go index f3b969788e3..91cef3a50bd 100644 --- a/tests/pause_workflow_execution_test.go +++ b/tests/pause_workflow_execution_test.go @@ -10,7 +10,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + querypb "go.temporal.io/api/query/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" @@ -113,6 +115,86 @@ func (s *PauseWorkflowExecutionSuite) TestPauseWorkflowExecution() { }, 5*time.Second, 200*time.Millisecond) } +func (s *PauseWorkflowExecutionSuite) TestQueryWorkflowWhenPaused() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.Worker().RegisterWorkflow(s.workflowFn) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("pause-wf-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowFn) + s.NoError(err) + workflowID := workflowRun.GetID() + runID := workflowRun.GetRunID() + + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) + require.NoError(t, err) + info := desc.GetWorkflowExecutionInfo() + require.NotNil(t, info) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, info.GetStatus()) + }, 5*time.Second, 100*time.Millisecond) + + // Pause the workflow. + pauseRequest := &workflowservice.PauseWorkflowExecutionRequest{ + Namespace: s.Namespace().String(), + WorkflowId: workflowID, + RunId: runID, + Identity: s.pauseIdentity, + Reason: s.pauseReason, + RequestId: uuid.New(), + } + pauseResp, err := s.FrontendClient().PauseWorkflowExecution(ctx, pauseRequest) + s.NoError(err) + s.NotNil(pauseResp) + + // Wait until paused. + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) + require.NoError(t, err) + info := desc.GetWorkflowExecutionInfo() + require.NotNil(t, info) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, info.GetStatus()) + if pauseInfo := desc.GetWorkflowExtendedInfo().GetPauseInfo(); pauseInfo != nil { + require.Equal(t, s.pauseIdentity, pauseInfo.GetIdentity()) + require.Equal(t, s.pauseReason, pauseInfo.GetReason()) + } + }, 5*time.Second, 200*time.Millisecond) + + // Issue a query to the paused workflow. It should return QueryRejected with WORKFLOW_EXECUTION_STATUS_PAUSED status. + queryReq := &workflowservice.QueryWorkflowRequest{ + Namespace: s.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "__stack_trace", + }, + } + queryResp, err := s.FrontendClient().QueryWorkflow(ctx, queryReq) + s.NoError(err) + s.NotNil(queryResp) + s.NotNil(queryResp.GetQueryRejected()) + s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, queryResp.GetQueryRejected().GetStatus()) + + // Complete the workflow to finish the test. + err = s.SdkClient().SignalWorkflow(ctx, workflowID, runID, s.testEndSignal, "test end signal") + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowID, runID) + require.NoError(t, err) + info := desc.GetWorkflowExecutionInfo() + require.NotNil(t, info) + require.Equal(t, enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, info.GetStatus()) + }, 5*time.Second, 200*time.Millisecond) +} + // TestPauseWorkflowExecutionRequestValidation tests that pause workflow execution request validation. We don't really need a valid workflow to test this. // - fails when the identity is too long. // - fails when the reason is too long.