Skip to content

Commit ec90b97

Browse files
samanbarghiyycptt
authored andcommitted
Fail QueryWorkflow if last workflow task failed (#4099)
* Fail QueryWorkflow if last WorkflowTask failed * fix linter issues * pr comments --------- Co-authored-by: Saman Barghi <[email protected]> Co-authored-by: Yichao Yang <[email protected]>
1 parent 480282a commit ec90b97

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

service/history/api/queryworkflow/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func Invoke(
115115
return nil, consts.ErrWorkflowTaskNotScheduled
116116
}
117117

118+
if mutableState.GetExecutionInfo().WorkflowTaskAttempt > 1 {
119+
// while workflow task is failing, the query to that workflow will also fail. Failing fast here to prevent wasting
120+
// resources to load history for a query that will fail.
121+
return nil, serviceerror.NewFailedPrecondition("Cannot query workflow due to Workflow Task in failed state.")
122+
}
123+
118124
// There are two ways in which queries get dispatched to workflow worker. First, queries can be dispatched on workflow tasks.
119125
// These workflow tasks potentially contain new events and queries. The events are treated as coming before the query in time.
120126
// The second way in which queries are dispatched to workflow worker is directly through matching; in this approach queries can be

tests/query_workflow_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838

3939
"go.temporal.io/server/service/history/consts"
4040

41+
"go.temporal.io/api/serviceerror"
4142
"go.temporal.io/server/common/log/tag"
4243
)
4344

@@ -257,3 +258,41 @@ func (s *clientIntegrationSuite) TestQueryWorkflow_QueryBeforeStart() {
257258
// wait query
258259
wg.Wait()
259260
}
261+
262+
func (s *clientIntegrationSuite) TestQueryWorkflow_QueryFailedWorkflowTask() {
263+
264+
workflowFn := func(ctx workflow.Context) (string, error) {
265+
err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
266+
return "", nil
267+
})
268+
269+
if err != nil {
270+
s.Logger.Fatal("SetQueryHandler failed: " + err.Error())
271+
}
272+
// force workflow task to fail
273+
panic("Workflow failed")
274+
}
275+
276+
s.worker.RegisterWorkflow(workflowFn)
277+
278+
id := "test-query-failed-workflow-task"
279+
workflowOptions := sdkclient.StartWorkflowOptions{
280+
ID: id,
281+
TaskQueue: s.taskQueue,
282+
WorkflowRunTimeout: 20 * time.Second,
283+
}
284+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
285+
defer cancel()
286+
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
287+
if err != nil {
288+
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
289+
}
290+
291+
s.NotNil(workflowRun)
292+
s.True(workflowRun.GetRunID() != "")
293+
294+
// wait for workflow to fail
295+
time.Sleep(time.Second * 5)
296+
_, err = s.sdkClient.QueryWorkflow(ctx, id, "", "test")
297+
s.IsType(&serviceerror.FailedPrecondition{}, err)
298+
}

0 commit comments

Comments
 (0)