Skip to content

Commit de84092

Browse files
wxing1292yycptt
authored andcommitted
Use more reliable workflow mutation check (#4076)
* Make workflow mutation check more reliable Namespace migration breaks one of the assumption of XDC XDC assumes there will be no case which a namespace has only one cluster & the cluster is not itself, apparently namespace migration can break this assumption
1 parent df2d8eb commit de84092

File tree

4 files changed

+67
-60
lines changed

4 files changed

+67
-60
lines changed

service/history/historyEngine2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (s *engine2Suite) SetupTest() {
149149
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
150150

151151
s.mockEventsCache = s.mockShard.MockEventsCache
152-
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.LocalNamespaceEntry, nil).AnyTimes()
152+
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()
153153
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ParentNamespaceID).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes()
154154
s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes()
155155
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

service/history/workflow/mutable_state_impl.go

Lines changed: 58 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4239,43 +4239,44 @@ func (ms *MutableStateImpl) eventsToReplicationTask(
42394239
transactionPolicy TransactionPolicy,
42404240
events []*historypb.HistoryEvent,
42414241
) error {
4242-
4243-
if transactionPolicy == TransactionPolicyPassive ||
4244-
!ms.canReplicateEvents() ||
4245-
len(events) == 0 {
4242+
switch transactionPolicy {
4243+
case TransactionPolicyActive:
4244+
if ms.generateReplicationTask() {
4245+
return ms.taskGenerator.GenerateHistoryReplicationTasks(events)
4246+
}
42464247
return nil
4248+
case TransactionPolicyPassive:
4249+
return nil
4250+
default:
4251+
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
42474252
}
4248-
4249-
currentBranchToken, err := ms.GetCurrentBranchToken()
4250-
if err != nil {
4251-
return err
4252-
}
4253-
return ms.taskGenerator.GenerateHistoryReplicationTasks(
4254-
currentBranchToken,
4255-
events,
4256-
)
42574253
}
42584254

42594255
func (ms *MutableStateImpl) syncActivityToReplicationTask(
42604256
now time.Time,
42614257
transactionPolicy TransactionPolicy,
42624258
) []tasks.Task {
42634259

4264-
if transactionPolicy == TransactionPolicyPassive ||
4265-
!ms.canReplicateEvents() {
4260+
switch transactionPolicy {
4261+
case TransactionPolicyActive:
4262+
if ms.generateReplicationTask() {
4263+
return convertSyncActivityInfos(
4264+
now,
4265+
definition.NewWorkflowKey(
4266+
ms.executionInfo.NamespaceId,
4267+
ms.executionInfo.WorkflowId,
4268+
ms.executionState.RunId,
4269+
),
4270+
ms.pendingActivityInfoIDs,
4271+
ms.syncActivityTasks,
4272+
)
4273+
}
4274+
return nil
4275+
case TransactionPolicyPassive:
42664276
return emptyTasks
4277+
default:
4278+
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
42674279
}
4268-
4269-
return convertSyncActivityInfos(
4270-
now,
4271-
definition.NewWorkflowKey(
4272-
ms.executionInfo.NamespaceId,
4273-
ms.executionInfo.WorkflowId,
4274-
ms.executionState.RunId,
4275-
),
4276-
ms.pendingActivityInfoIDs,
4277-
ms.syncActivityTasks,
4278-
)
42794280
}
42804281

42814282
func (ms *MutableStateImpl) updatePendingEventIDs(
@@ -4319,10 +4320,6 @@ func (ms *MutableStateImpl) updateWithLastWriteEvent(
43194320
return nil
43204321
}
43214322

4322-
func (ms *MutableStateImpl) canReplicateEvents() bool {
4323-
return ms.namespaceEntry.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster
4324-
}
4325-
43264323
// validateNoEventsAfterWorkflowFinish perform check on history event batch
43274324
// NOTE: do not apply this check on every batch, since transient
43284325
// workflow task && workflow finish will be broken (the first batch)
@@ -4478,23 +4475,24 @@ func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool,
44784475
func (ms *MutableStateImpl) closeTransactionWithPolicyCheck(
44794476
transactionPolicy TransactionPolicy,
44804477
) error {
4481-
4482-
if transactionPolicy == TransactionPolicyPassive ||
4483-
!ms.canReplicateEvents() {
4478+
switch transactionPolicy {
4479+
case TransactionPolicyActive:
4480+
// Cannot use ms.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in
4481+
// passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The
4482+
// currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster.
4483+
activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion(ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion())
4484+
currentCluster := ms.clusterMetadata.GetCurrentClusterName()
4485+
4486+
if activeCluster != currentCluster {
4487+
namespaceID := ms.GetExecutionInfo().NamespaceId
4488+
return serviceerror.NewNamespaceNotActive(namespaceID, currentCluster, activeCluster)
4489+
}
44844490
return nil
4491+
case TransactionPolicyPassive:
4492+
return nil
4493+
default:
4494+
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
44854495
}
4486-
4487-
// Cannot use ms.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in
4488-
// passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The
4489-
// currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster.
4490-
activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion(ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion())
4491-
currentCluster := ms.clusterMetadata.GetCurrentClusterName()
4492-
4493-
if activeCluster != currentCluster {
4494-
namespaceID := ms.GetExecutionInfo().NamespaceId
4495-
return serviceerror.NewNamespaceNotActive(namespaceID, currentCluster, activeCluster)
4496-
}
4497-
return nil
44984496
}
44994497

45004498
func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
@@ -4573,17 +4571,24 @@ func (ms *MutableStateImpl) closeTransactionHandleWorkflowReset(
45734571
func (ms *MutableStateImpl) closeTransactionHandleActivityUserTimerTasks(
45744572
transactionPolicy TransactionPolicy,
45754573
) error {
4576-
4577-
if transactionPolicy == TransactionPolicyPassive ||
4578-
!ms.IsWorkflowExecutionRunning() {
4574+
switch transactionPolicy {
4575+
case TransactionPolicyActive:
4576+
if !ms.IsWorkflowExecutionRunning() {
4577+
return nil
4578+
}
4579+
if err := ms.taskGenerator.GenerateActivityTimerTasks(); err != nil {
4580+
return err
4581+
}
4582+
return ms.taskGenerator.GenerateUserTimerTasks()
4583+
case TransactionPolicyPassive:
45794584
return nil
4585+
default:
4586+
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
45804587
}
4588+
}
45814589

4582-
if err := ms.taskGenerator.GenerateActivityTimerTasks(); err != nil {
4583-
return err
4584-
}
4585-
4586-
return ms.taskGenerator.GenerateUserTimerTasks()
4590+
func (ms *MutableStateImpl) generateReplicationTask() bool {
4591+
return len(ms.namespaceEntry.ClusterNames()) > 1
45874592
}
45884593

45894594
func (ms *MutableStateImpl) checkMutability(

service/history/workflow/task_generator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ type (
9999

100100
// replication tasks
101101
GenerateHistoryReplicationTasks(
102-
branchToken []byte,
103102
events []*historypb.HistoryEvent,
104103
) error
105104
GenerateMigrationTasks() (tasks.Task, error)
@@ -569,9 +568,12 @@ func (r *TaskGeneratorImpl) GenerateUserTimerTasks() error {
569568
}
570569

571570
func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
572-
branchToken []byte,
573571
events []*historypb.HistoryEvent,
574572
) error {
573+
if len(events) == 0 {
574+
return nil
575+
}
576+
575577
firstEvent := events[0]
576578
lastEvent := events[len(events)-1]
577579
if firstEvent.GetVersion() != lastEvent.GetVersion() {

service/history/workflow/task_generator_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)