Skip to content

Commit 4375f94

Browse files
authored
Handle delete history branch (#2591)
* Add branch token to workflow cleanup task
1 parent 6fa5b4a commit 4375f94

16 files changed

+401
-306
lines changed

api/persistence/v1/executions.pb.go

Lines changed: 266 additions & 208 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/persistence-tests/executionManagerTest.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2392,10 +2392,10 @@ func (s *ExecutionManagerSuite) TestTimerTasksComplete() {
23922392
updatedState := copyWorkflowExecutionState(state0.ExecutionState)
23932393
updatedInfo.LastWorkflowTaskStartId = int64(2)
23942394
taskSlice := []tasks.Task{
2395-
&tasks.WorkflowTimeoutTask{workflowKey, now.Add(2 * time.Second), 2, 12},
2396-
&tasks.DeleteHistoryEventTask{workflowKey, now.Add(2 * time.Second), 3, 13},
2397-
&tasks.ActivityTimeoutTask{workflowKey, now.Add(3 * time.Second), 4, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, 7, 0, 14},
2398-
&tasks.UserTimerTask{workflowKey, now.Add(3 * time.Second), 5, 7, 15},
2395+
&tasks.WorkflowTimeoutTask{WorkflowKey: workflowKey, VisibilityTimestamp: now.Add(2 * time.Second), TaskID: 2, Version: 12},
2396+
&tasks.DeleteHistoryEventTask{WorkflowKey: workflowKey, VisibilityTimestamp: now.Add(2 * time.Second), TaskID: 3, Version: 13},
2397+
&tasks.ActivityTimeoutTask{WorkflowKey: workflowKey, VisibilityTimestamp: now.Add(3 * time.Second), TaskID: 4, TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, EventID: 7, Version: 14},
2398+
&tasks.UserTimerTask{WorkflowKey: workflowKey, VisibilityTimestamp: now.Add(3 * time.Second), TaskID: 5, EventID: 7, Version: 15},
23992399
}
24002400
err2 := s.UpdateWorkflowExecution(updatedInfo, updatedState, int64(5), []int64{int64(4)}, nil, int64(3), taskSlice, nil, nil, nil, nil)
24012401
s.NoError(err2)
@@ -2459,11 +2459,11 @@ func (s *ExecutionManagerSuite) TestTimerTasksRangeComplete() {
24592459
updatedState := copyWorkflowExecutionState(state0.ExecutionState)
24602460
updatedInfo.LastWorkflowTaskStartId = int64(2)
24612461
taskSlice := []tasks.Task{
2462-
&tasks.WorkflowTaskTimeoutTask{workflowKey, time.Now().UTC(), 1, 2, 3, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, 11},
2463-
&tasks.WorkflowTimeoutTask{workflowKey, time.Now().UTC(), 2, 12},
2464-
&tasks.DeleteHistoryEventTask{workflowKey, time.Now().UTC(), 3, 13},
2465-
&tasks.ActivityTimeoutTask{workflowKey, time.Now().UTC(), 4, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, 7, 0, 14},
2466-
&tasks.UserTimerTask{workflowKey, time.Now().UTC(), 5, 7, 15},
2462+
&tasks.WorkflowTaskTimeoutTask{WorkflowKey: workflowKey, VisibilityTimestamp: time.Now().UTC(), TaskID: 1, EventID: 2, ScheduleAttempt: 3, TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, Version: 11},
2463+
&tasks.WorkflowTimeoutTask{WorkflowKey: workflowKey, VisibilityTimestamp: time.Now().UTC(), TaskID: 2, Version: 12},
2464+
&tasks.DeleteHistoryEventTask{WorkflowKey: workflowKey, VisibilityTimestamp: time.Now().UTC(), TaskID: 3, Version: 13},
2465+
&tasks.ActivityTimeoutTask{WorkflowKey: workflowKey, VisibilityTimestamp: time.Now().UTC(), TaskID: 4, TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, EventID: 7, Version: 14},
2466+
&tasks.UserTimerTask{WorkflowKey: workflowKey, VisibilityTimestamp: time.Now().UTC(), TaskID: 5, EventID: 7, Version: 15},
24672467
}
24682468
err2 := s.UpdateWorkflowExecution(updatedInfo, updatedState, int64(5), []int64{int64(4)}, nil, int64(3), taskSlice, nil, nil, nil, nil)
24692469
s.NoError(err2)

common/persistence/serialization/task_serializer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,7 @@ func (s *TaskSerializer) timerWorkflowCleanupTaskToProto(
811811
EventId: 0,
812812
TaskId: workflowCleanupTimer.TaskID,
813813
VisibilityTime: &workflowCleanupTimer.VisibilityTimestamp,
814+
BranchToken: workflowCleanupTimer.BranchToken,
814815
}
815816
}
816817

@@ -826,6 +827,7 @@ func (s *TaskSerializer) timerWorkflowCleanupTaskFromProto(
826827
VisibilityTimestamp: *workflowCleanupTimer.VisibilityTime,
827828
TaskID: workflowCleanupTimer.TaskId,
828829
Version: workflowCleanupTimer.Version,
830+
BranchToken: workflowCleanupTimer.BranchToken,
829831
}
830832
}
831833

common/persistence/serialization/task_serializer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ func (s *taskSerializerSuite) TestTimerWorkflowCleanupTask() {
253253
VisibilityTimestamp: time.Unix(0, rand.Int63()).UTC(),
254254
TaskID: rand.Int63(),
255255
Version: rand.Int63(),
256+
BranchToken: []byte{123},
256257
}
257258

258259
s.assertEqualTasks(workflowCleanupTimer)

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ message TimerTaskInfo {
206206
int64 event_id = 9;
207207
int64 task_id = 10;
208208
google.protobuf.Timestamp visibility_time = 11 [(gogoproto.stdtime) = true];
209+
bytes branch_token = 12;
209210
}
210211

211212
// activity_map column

service/history/nDCTaskUtil.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,30 @@ const (
4242
refreshTaskTimeout = 30 * time.Second
4343
)
4444

45-
// verifyTaskVersion, will return true if failover version check is successful
46-
func verifyTaskVersion(
45+
// VerifyTaskVersion, will return true if failover version check is successful
46+
func VerifyTaskVersion(
4747
shard shard.Context,
4848
logger log.Logger,
49-
namespaceID namespace.ID,
49+
namespace *namespace.Namespace,
5050
version int64,
5151
taskVersion int64,
5252
task interface{},
53-
) (bool, error) {
53+
) bool {
5454

5555
if !shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
56-
return true, nil
56+
return true
5757
}
5858

5959
// the first return value is whether this task is valid for further processing
60-
namespaceEntry, err := shard.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
61-
if err != nil {
62-
logger.Debug("Cannot find namespaceID", tag.WorkflowNamespaceID(namespaceID.String()), tag.Error(err))
63-
return false, err
64-
}
65-
if !namespaceEntry.IsGlobalNamespace() {
66-
logger.Debug("NamespaceID is not active, task version check pass", tag.WorkflowNamespaceID(namespaceID.String()), tag.Task(task))
67-
return true, nil
60+
if !namespace.IsGlobalNamespace() {
61+
logger.Debug("NamespaceID is not global, task version check pass", tag.WorkflowNamespaceID(namespace.ID().String()), tag.Task(task))
62+
return true
6863
} else if version != taskVersion {
69-
logger.Debug("NamespaceID is active, task version != target version", tag.WorkflowNamespaceID(namespaceID.String()), tag.Task(task), tag.TaskVersion(version))
70-
return false, nil
64+
logger.Debug("NamespaceID is global, task version != target version", tag.WorkflowNamespaceID(namespace.ID().String()), tag.Task(task), tag.TaskVersion(version))
65+
return false
7166
}
72-
logger.Debug("NamespaceID is active, task version == target version", tag.WorkflowNamespaceID(namespaceID.String()), tag.Task(task), tag.TaskVersion(version))
73-
return true, nil
67+
logger.Debug("NamespaceID is global, task version == target version", tag.WorkflowNamespaceID(namespace.ID().String()), tag.Task(task), tag.TaskVersion(version))
68+
return true
7469
}
7570

7671
// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
@@ -81,7 +76,7 @@ func loadMutableStateForTransferTask(
8176
metricsClient metrics.Client,
8277
logger log.Logger,
8378
) (workflow.MutableState, error) {
84-
return loadMutableStateForTask(
79+
return LoadMutableStateForTask(
8580
context,
8681
transferTask,
8782
getTransferTaskEventIDAndRetryable,
@@ -98,7 +93,7 @@ func loadMutableStateForTimerTask(
9893
metricsClient metrics.Client,
9994
logger log.Logger,
10095
) (workflow.MutableState, error) {
101-
return loadMutableStateForTask(
96+
return LoadMutableStateForTask(
10297
context,
10398
timerTask,
10499
getTimerTaskEventIDAndRetryable,
@@ -107,7 +102,7 @@ func loadMutableStateForTimerTask(
107102
)
108103
}
109104

110-
func loadMutableStateForTask(
105+
func LoadMutableStateForTask(
111106
context workflow.Context,
112107
task tasks.Task,
113108
taskEventIDAndRetryable func(task tasks.Task, executionInfo *persistencespb.WorkflowExecutionInfo) (int64, bool),

service/history/tasks/workflow_cleanup_timer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type (
3939
VisibilityTimestamp time.Time
4040
TaskID int64
4141
Version int64
42+
BranchToken []byte
4243
}
4344
)
4445

service/history/timerQueueActiveTaskExecutor.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,9 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask(
314314
if !ok {
315315
return nil
316316
}
317-
ok, err = verifyTaskVersion(t.shard, t.logger, namespace.ID(task.NamespaceID), workflowTask.Version, task.Version, task)
318-
if err != nil || !ok {
319-
return err
317+
ok = VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, task.Version, task)
318+
if !ok {
319+
return nil
320320
}
321321

322322
if workflowTask.Attempt != task.ScheduleAttempt {
@@ -447,9 +447,9 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
447447
}
448448
return nil
449449
}
450-
ok, err = verifyTaskVersion(t.shard, t.logger, namespace.ID(task.NamespaceID), activityInfo.Version, task.Version, task)
451-
if err != nil || !ok {
452-
return err
450+
ok = VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), activityInfo.Version, task.Version, task)
451+
if !ok {
452+
return nil
453453
}
454454

455455
targetNamespaceID := activityInfo.NamespaceId
@@ -509,9 +509,9 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTimeoutTask(
509509
if err != nil {
510510
return err
511511
}
512-
ok, err := verifyTaskVersion(t.shard, t.logger, namespace.ID(task.NamespaceID), startVersion, task.Version, task)
513-
if err != nil || !ok {
514-
return err
512+
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), startVersion, task.Version, task)
513+
if !ok {
514+
return nil
515515
}
516516

517517
eventBatchFirstEventID := mutableState.GetNextEventID()

service/history/timerQueueStandbyTaskExecutor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,9 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask(
291291
return nil, nil
292292
}
293293

294-
ok, err := verifyTaskVersion(t.shard, t.logger, namespace.ID(timerTask.NamespaceID), workflowTask.Version, timerTask.Version, timerTask)
295-
if err != nil || !ok {
296-
return nil, err
294+
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), workflowTask.Version, timerTask.Version, timerTask)
295+
if !ok {
296+
return nil, nil
297297
}
298298

299299
return getHistoryResendInfo(mutableState)
@@ -369,9 +369,9 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTimeoutTask(
369369
if err != nil {
370370
return nil, err
371371
}
372-
ok, err := verifyTaskVersion(t.shard, t.logger, namespace.ID(timerTask.NamespaceID), startVersion, timerTask.Version, timerTask)
373-
if err != nil || !ok {
374-
return nil, err
372+
ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), startVersion, timerTask.Version, timerTask)
373+
if !ok {
374+
return nil, nil
375375
}
376376

377377
return getHistoryResendInfo(mutableState)

service/history/timerQueueTaskExecutorBase.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
package history
2626

2727
import (
28+
"bytes"
2829
"context"
2930

3031
commonpb "go.temporal.io/api/common/v1"
31-
32+
"go.temporal.io/api/serviceerror"
3233
"go.temporal.io/server/common/log"
34+
"go.temporal.io/server/common/log/tag"
3335
"go.temporal.io/server/common/metrics"
3436
"go.temporal.io/server/common/namespace"
37+
"go.temporal.io/server/common/persistence"
3538
"go.temporal.io/server/service/history/configs"
3639
"go.temporal.io/server/service/history/shard"
3740
"go.temporal.io/server/service/history/tasks"
@@ -91,17 +94,35 @@ func (t *timerQueueTaskExecutorBase) executeDeleteHistoryEventTask(
9194
defer func() { release(retError) }()
9295

9396
mutableState, err := loadMutableStateForTimerTask(weContext, task, t.metricsClient, t.logger)
94-
if err != nil {
97+
switch err.(type) {
98+
case nil:
99+
if mutableState == nil {
100+
return nil
101+
}
102+
case *serviceerror.NotFound:
103+
// the mutable state is deleted and delete history branch operation failed.
104+
// use task branch token to delete the leftover history branch
105+
return t.deleteHistoryBranch(task.BranchToken)
106+
default:
95107
return err
96108
}
97109

98110
lastWriteVersion, err := mutableState.GetLastWriteVersion()
99111
if err != nil {
100112
return err
101113
}
102-
ok, err := verifyTaskVersion(t.shard, t.logger, namespace.ID(task.NamespaceID), lastWriteVersion, task.Version, task)
103-
if err != nil || !ok {
104-
return err
114+
if ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.Version, task); !ok {
115+
currentBranchToken, err := mutableState.GetCurrentBranchToken()
116+
if err != nil {
117+
return err
118+
}
119+
// the mutable state has a newer version and the branch token is updated
120+
// use task branch token to delete the original branch
121+
if !bytes.Equal(task.BranchToken, currentBranchToken) {
122+
return t.deleteHistoryBranch(task.BranchToken)
123+
}
124+
t.logger.Error("Different mutable state versions have the same branch token", tag.TaskVersion(task.Version), tag.LastEventVersion(lastWriteVersion))
125+
return serviceerror.NewInternal("Mutable state has different version but same branch token")
105126
}
106127

107128
return t.deleteManager.DeleteWorkflowExecutionByRetention(
@@ -122,3 +143,13 @@ func (t *timerQueueTaskExecutorBase) getNamespaceIDAndWorkflowExecution(
122143
RunId: task.GetRunID(),
123144
}
124145
}
146+
147+
func (t *timerQueueTaskExecutorBase) deleteHistoryBranch(branchToken []byte) error {
148+
if len(branchToken) > 0 {
149+
return t.shard.GetExecutionManager().DeleteHistoryBranch(&persistence.DeleteHistoryBranchRequest{
150+
ShardID: t.shard.GetShardID(),
151+
BranchToken: branchToken,
152+
})
153+
}
154+
return nil
155+
}

0 commit comments

Comments
 (0)