Skip to content

Commit dbafe8a

Browse files
authored
Reset for workflows without completed tasks (#665)
Allow resetting workflows without completed tasks
1 parent c551f21 commit dbafe8a

File tree

4 files changed

+199
-28
lines changed

4 files changed

+199
-28
lines changed

service/history/workflowResetor.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (w *workflowResetorImpl) validateResetWorkflowAfterReplay(newMutableState m
187187
if retError := newMutableState.CheckResettable(); retError != nil {
188188
return retError
189189
}
190-
if !newMutableState.HasInFlightWorkflowTask() {
190+
if !newMutableState.HasPendingWorkflowTask() {
191191
return serviceerror.NewInternal(fmt.Sprintf("can't find the last started workflow task"))
192192
}
193193
if newMutableState.HasBufferedEvents() {
@@ -290,8 +290,7 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset(
290290

291291
// failed the in-flight workflow task(started).
292292
// Note that we need to ensure WorkflowTaskFailed event is appended right after WorkflowTaskStarted event
293-
workflowTask, _ := newMutableState.GetInFlightWorkflowTask()
294-
293+
workflowTask, _ := newMutableState.GetPendingWorkflowTask()
295294
_, err := newMutableState.AddWorkflowTaskFailedEvent(workflowTask.ScheduleID, workflowTask.StartedID, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, nil,
296295
identityHistoryService, resetReason, baseRunID, newRunID, forkEventVersion)
297296
if err != nil {
@@ -751,8 +750,10 @@ func validateLastBatchOfReset(lastBatch []*historypb.HistoryEvent, workflowTaskF
751750
return serviceerror.NewInvalidArgument(fmt.Sprintf("wrong WorkflowTaskFinishEventId, it must be WorkflowTaskStarted + 1: %v", lastEvent.GetEventId()))
752751
}
753752

754-
if lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
755-
return serviceerror.NewInvalidArgument(fmt.Sprintf("wrong WorkflowTaskFinishEventId, previous batch doesn't include WorkflowTaskStarted, lastFirstEventId: %v", firstEvent.GetEventId()))
753+
if lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED &&
754+
lastEvent.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
755+
return serviceerror.NewInvalidArgument(fmt.Sprintf("unable to use provided event id %v as a reset point as previous batch [%v-%v] should end with WorkflowTaskStarted or WorkflowTaskScheduled event",
756+
workflowTaskFinishEventID, firstEvent.GetEventId(), lastEvent.GetEventId()))
756757
}
757758

758759
return nil

service/history/workflowResetor_test.go

Lines changed: 169 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_NoReplication() {
709709
for _, e := range be.Events {
710710
eid++
711711
if e.GetEventId() != eid {
712-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
712+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
713713
}
714714
e.EventTime = &eventTime
715715
}
@@ -1403,7 +1403,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_NoReplication_WithRequestCance
14031403
for _, e := range be.Events {
14041404
eid++
14051405
if e.GetEventId() != eid {
1406-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
1406+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
14071407
}
14081408
e.EventTime = &eventTime
14091409
}
@@ -1998,7 +1998,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_WithTerminatingCur
19981998
for _, e := range be.Events {
19991999
eid++
20002000
if e.GetEventId() != eid {
2001-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
2001+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
20022002
}
20032003
e.EventTime = &eventTime
20042004
}
@@ -2703,7 +2703,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NotActive() {
27032703
for _, e := range be.Events {
27042704
eid++
27052705
if e.GetEventId() != eid {
2706-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
2706+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
27072707
}
27082708
e.EventTime = &eventTime
27092709
}
@@ -3304,7 +3304,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_Replication_NoTerminatingCurre
33043304
for _, e := range be.Events {
33053305
eid++
33063306
if e.GetEventId() != eid {
3307-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
3307+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
33083308
}
33093309
e.EventTime = &eventTime
33103310
}
@@ -3923,7 +3923,7 @@ func (s *resetorSuite) TestApplyReset() {
39233923
for _, e := range be.Events {
39243924
eid++
39253925
if e.GetEventId() != eid {
3926-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
3926+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
39273927
}
39283928
e.EventTime = &eventTime
39293929
}
@@ -4319,7 +4319,7 @@ func (s *resetorSuite) TestResetWorkflowExecution_WithoutRunID() {
43194319
for _, e := range be.Events {
43204320
eid++
43214321
if e.GetEventId() != eid {
4322-
s.Fail(fmt.Sprintf("inconintous eventID: %v, %v", eid, e.GetEventId()))
4322+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
43234323
}
43244324
e.EventTime = &eventTime
43254325
}
@@ -4371,3 +4371,165 @@ func (s *resetorSuite) TestResetWorkflowExecution_WithoutRunID() {
43714371
s.Nil(err)
43724372
s.NotNil(response.RunId)
43734373
}
4374+
4375+
func (s *resetorSuite) TestResetWorkflowExecution_NoCompletedTasks() {
4376+
testNamespaceEntry := cache.NewLocalNamespaceCacheEntryForTest(
4377+
&persistenceblobs.NamespaceInfo{Id: testNamespaceID}, &persistenceblobs.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", nil,
4378+
)
4379+
s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes()
4380+
s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes()
4381+
4382+
namespaceID := testNamespaceID
4383+
4384+
request := &historyservice.ResetWorkflowExecutionRequest{NamespaceId: namespaceID, ResetRequest: &workflowservice.ResetWorkflowExecutionRequest{}}
4385+
4386+
wid := "wId"
4387+
wfType := "wfType"
4388+
taskQueueName := "taskQueue"
4389+
forkRunID := uuid.New().String()
4390+
currRunID := uuid.New().String()
4391+
4392+
we := commonpb.WorkflowExecution{
4393+
WorkflowId: wid,
4394+
RunId: forkRunID,
4395+
}
4396+
4397+
request.ResetRequest = &workflowservice.ResetWorkflowExecutionRequest{
4398+
Namespace: "testNamespace",
4399+
WorkflowExecution: &we,
4400+
Reason: "test reset",
4401+
WorkflowTaskFinishEventId: 3,
4402+
RequestId: uuid.New().String(),
4403+
}
4404+
4405+
forkBranchToken := []byte("forkBranchToken")
4406+
taskQueue := &taskqueuepb.TaskQueue{
4407+
Name: taskQueueName,
4408+
}
4409+
// Prepare history event sequence.
4410+
readHistoryResponse := &persistence.ReadHistoryBranchByBatchResponse{
4411+
NextPageToken: nil,
4412+
Size: 1000,
4413+
LastFirstEventID: int64(3),
4414+
History: []*historypb.History{
4415+
{
4416+
Events: []*historypb.HistoryEvent{
4417+
{
4418+
EventId: 1,
4419+
Version: common.EmptyVersion,
4420+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
4421+
Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{
4422+
WorkflowType: &commonpb.WorkflowType{
4423+
Name: wfType,
4424+
},
4425+
TaskQueue: taskQueue,
4426+
Input: payloads.EncodeString("testInput"),
4427+
WorkflowExecutionTimeout: timestamp.DurationPtr(100 * time.Second),
4428+
WorkflowRunTimeout: timestamp.DurationPtr(50 * time.Second),
4429+
WorkflowTaskTimeout: timestamp.DurationPtr(200 * time.Second),
4430+
}},
4431+
},
4432+
{
4433+
EventId: 2,
4434+
Version: common.EmptyVersion,
4435+
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
4436+
Attributes: &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
4437+
TaskQueue: taskQueue,
4438+
StartToCloseTimeout: timestamp.DurationPtr(100 * time.Second),
4439+
}},
4440+
},
4441+
},
4442+
},
4443+
{
4444+
Events: []*historypb.HistoryEvent{
4445+
{
4446+
EventId: 3,
4447+
Version: common.EmptyVersion,
4448+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
4449+
Attributes: &historypb.HistoryEvent_WorkflowTaskTimedOutEventAttributes{WorkflowTaskTimedOutEventAttributes: &historypb.WorkflowTaskTimedOutEventAttributes{}},
4450+
},
4451+
},
4452+
},
4453+
},
4454+
}
4455+
4456+
eid := int64(0)
4457+
eventTime := time.Unix(0, 1000).UTC()
4458+
for _, be := range readHistoryResponse.History {
4459+
for _, e := range be.Events {
4460+
eid++
4461+
if e.GetEventId() != eid {
4462+
s.Fail(fmt.Sprintf("non-continuous eventID: %v, %v", eid, e.GetEventId()))
4463+
}
4464+
e.EventTime = &eventTime
4465+
}
4466+
}
4467+
4468+
// Mock calls.
4469+
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
4470+
NamespaceID: namespaceID,
4471+
Execution: commonpb.WorkflowExecution{
4472+
WorkflowId: wid,
4473+
RunId: forkRunID,
4474+
},
4475+
}).Return(&persistence.GetWorkflowExecutionResponse{State: &persistence.WorkflowMutableState{
4476+
ExecutionInfo: &persistence.WorkflowExecutionInfo{
4477+
NamespaceID: namespaceID,
4478+
WorkflowID: wid,
4479+
WorkflowTypeName: wfType,
4480+
TaskQueue: taskQueueName,
4481+
RunID: forkRunID,
4482+
BranchToken: forkBranchToken,
4483+
NextEventID: 4,
4484+
WorkflowTaskVersion: common.EmptyVersion,
4485+
WorkflowTaskScheduleID: common.EmptyEventID,
4486+
WorkflowTaskStartedID: common.EmptyEventID,
4487+
State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
4488+
},
4489+
ExecutionStats: &persistenceblobs.ExecutionStats{},
4490+
}}, nil).Once()
4491+
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{
4492+
RunID: currRunID,
4493+
}, nil).Once()
4494+
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
4495+
NamespaceID: namespaceID,
4496+
Execution: commonpb.WorkflowExecution{
4497+
WorkflowId: wid,
4498+
RunId: currRunID,
4499+
},
4500+
}).Return(&persistence.GetWorkflowExecutionResponse{State: &persistence.WorkflowMutableState{
4501+
ExecutionInfo: &persistence.WorkflowExecutionInfo{
4502+
NamespaceID: namespaceID,
4503+
WorkflowID: wid,
4504+
WorkflowTypeName: wfType,
4505+
TaskQueue: taskQueueName,
4506+
RunID: currRunID,
4507+
NextEventID: common.FirstEventID,
4508+
WorkflowTaskVersion: common.EmptyVersion,
4509+
WorkflowTaskScheduleID: common.EmptyEventID,
4510+
WorkflowTaskStartedID: common.EmptyEventID,
4511+
State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
4512+
},
4513+
ExecutionStats: &persistenceblobs.ExecutionStats{},
4514+
}}, nil).Once()
4515+
s.mockHistoryV2Mgr.On("ReadHistoryBranchByBatch", &persistence.ReadHistoryBranchRequest{
4516+
BranchToken: forkBranchToken,
4517+
MinEventID: common.FirstEventID,
4518+
MaxEventID: int64(4),
4519+
PageSize: defaultHistoryPageSize,
4520+
NextPageToken: nil,
4521+
ShardID: &s.shardID,
4522+
}).Return(readHistoryResponse, nil).Once()
4523+
s.mockHistoryV2Mgr.On("ForkHistoryBranch", mock.Anything).Return(&persistence.ForkHistoryBranchResponse{
4524+
NewBranchToken: []byte("newBranch"),
4525+
}, nil).Once()
4526+
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&persistence.AppendHistoryNodesResponse{
4527+
Size: 200,
4528+
}, nil).Times(2)
4529+
s.mockExecutionMgr.On("ResetWorkflowExecution", mock.Anything).Return(nil).Once()
4530+
4531+
// Perform a reset and make sure there is no error.
4532+
response, err := s.historyEngine.ResetWorkflowExecution(context.Background(), request)
4533+
s.Nil(err)
4534+
s.NotNil(response.RunId)
4535+
}

tools/cli/defs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ var envKeysForUserName = []string{
8181
}
8282

8383
var resetTypesMap = map[string]string{
84-
"FirstWorkflowTaskCompleted": "",
85-
"LastWorkflowTaskCompleted": "",
84+
"FirstWorkflowTask": "",
85+
"LastWorkflowTask": "",
8686
"LastContinuedAsNew": "",
8787
"BadBinary": FlagResetBadBinaryChecksum,
8888
}

tools/cli/workflowCommands.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,8 +1725,8 @@ func isLastEventWorkflowTaskFailedWithNonDeterminism(ctx context.Context, namesp
17251725
func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskFinishID int64, err error) {
17261726
fmt.Println("resetType:", resetType)
17271727
switch resetType {
1728-
case "LastWorkflowTaskCompleted":
1729-
resetBaseRunID, workflowTaskFinishID, err = getLastWorkflowTaskCompletedID(ctx, namespace, wid, rid, frontendClient)
1728+
case "LastWorkflowTask":
1729+
resetBaseRunID, workflowTaskFinishID, err = getLastWorkflowTaskEventID(ctx, namespace, wid, rid, frontendClient)
17301730
if err != nil {
17311731
return
17321732
}
@@ -1735,8 +1735,8 @@ func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, names
17351735
if err != nil {
17361736
return
17371737
}
1738-
case "FirstWorkflowTaskCompleted":
1739-
resetBaseRunID, workflowTaskFinishID, err = getFirstWorkflowTaskCompletedID(ctx, namespace, wid, rid, frontendClient)
1738+
case "FirstWorkflowTask":
1739+
resetBaseRunID, workflowTaskFinishID, err = getFirstWorkflowTaskEventID(ctx, namespace, wid, rid, frontendClient)
17401740
if err != nil {
17411741
return
17421742
}
@@ -1752,7 +1752,8 @@ func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, names
17521752
return
17531753
}
17541754

1755-
func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskCompletedID int64, err error) {
1755+
// Returns event id of the last completed task or id of the next event after scheduled task.
1756+
func getLastWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
17561757
resetBaseRunID = rid
17571758
req := &workflowservice.GetWorkflowExecutionHistoryRequest{
17581759
Namespace: namespace,
@@ -1771,7 +1772,9 @@ func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid str
17711772
}
17721773
for _, e := range resp.GetHistory().GetEvents() {
17731774
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
1774-
workflowTaskCompletedID = e.GetEventId()
1775+
workflowTaskEventID = e.GetEventId()
1776+
} else if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
1777+
workflowTaskEventID = e.GetEventId() + 1
17751778
}
17761779
}
17771780
if len(resp.NextPageToken) != 0 {
@@ -1780,8 +1783,8 @@ func getLastWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid str
17801783
break
17811784
}
17821785
}
1783-
if workflowTaskCompletedID == 0 {
1784-
return "", 0, printErrorAndReturn("Get LastWorkflowTaskCompletedID failed", fmt.Errorf("no WorkflowTaskCompletedID"))
1786+
if workflowTaskEventID == 0 {
1787+
return "", 0, printErrorAndReturn("Get LastWorkflowTaskID failed", fmt.Errorf("unable to find any scheduled or completed task"))
17851788
}
17861789
return
17871790
}
@@ -1814,7 +1817,8 @@ func getBadWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid, bin
18141817
return
18151818
}
18161819

1817-
func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskCompletedID int64, err error) {
1820+
// Returns id of the first workflow task completed event or if it doesn't exist then id of the event after task scheduled event.
1821+
func getFirstWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
18181822
resetBaseRunID = rid
18191823
req := &workflowservice.GetWorkflowExecutionHistoryRequest{
18201824
Namespace: namespace,
@@ -1825,16 +1829,20 @@ func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid st
18251829
MaximumPageSize: 1000,
18261830
NextPageToken: nil,
18271831
}
1828-
18291832
for {
18301833
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
18311834
if err != nil {
18321835
return "", 0, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
18331836
}
18341837
for _, e := range resp.GetHistory().GetEvents() {
18351838
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
1836-
workflowTaskCompletedID = e.GetEventId()
1837-
return resetBaseRunID, workflowTaskCompletedID, nil
1839+
workflowTaskEventID = e.GetEventId()
1840+
return resetBaseRunID, workflowTaskEventID, nil
1841+
}
1842+
if e.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
1843+
if workflowTaskEventID == 0 {
1844+
workflowTaskEventID = e.GetEventId() + 1
1845+
}
18381846
}
18391847
}
18401848
if len(resp.NextPageToken) != 0 {
@@ -1843,8 +1851,8 @@ func getFirstWorkflowTaskCompletedID(ctx context.Context, namespace, wid, rid st
18431851
break
18441852
}
18451853
}
1846-
if workflowTaskCompletedID == 0 {
1847-
return "", 0, printErrorAndReturn("Get FirstWorkflowTaskCompletedID failed", fmt.Errorf("no WorkflowTaskCompletedID"))
1854+
if workflowTaskEventID == 0 {
1855+
return "", 0, printErrorAndReturn("Get FirstWorkflowTaskID failed", fmt.Errorf("unable to find any scheduled or completed task"))
18481856
}
18491857
return
18501858
}

0 commit comments

Comments
 (0)