Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *engine2Suite) SetupTest() {
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata

s.mockEventsCache = s.mockShard.MockEventsCache
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.LocalNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.ParentNamespaceID).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespace(tests.ChildNamespace).Return(tests.GlobalChildNamespaceEntry, nil).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
Expand Down
111 changes: 58 additions & 53 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4225,43 +4225,44 @@ func (ms *MutableStateImpl) eventsToReplicationTask(
transactionPolicy TransactionPolicy,
events []*historypb.HistoryEvent,
) error {

if transactionPolicy == TransactionPolicyPassive ||
!ms.canReplicateEvents() ||
len(events) == 0 {
switch transactionPolicy {
case TransactionPolicyActive:
if ms.generateReplicationTask() {
return ms.taskGenerator.GenerateHistoryReplicationTasks(events)
}
return nil
case TransactionPolicyPassive:
return nil
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
}

currentBranchToken, err := ms.GetCurrentBranchToken()
if err != nil {
return err
}
return ms.taskGenerator.GenerateHistoryReplicationTasks(
currentBranchToken,
events,
)
}

func (ms *MutableStateImpl) syncActivityToReplicationTask(
now time.Time,
transactionPolicy TransactionPolicy,
) []tasks.Task {

if transactionPolicy == TransactionPolicyPassive ||
!ms.canReplicateEvents() {
switch transactionPolicy {
case TransactionPolicyActive:
if ms.generateReplicationTask() {
return convertSyncActivityInfos(
now,
definition.NewWorkflowKey(
ms.executionInfo.NamespaceId,
ms.executionInfo.WorkflowId,
ms.executionState.RunId,
),
ms.pendingActivityInfoIDs,
ms.syncActivityTasks,
)
}
return nil
case TransactionPolicyPassive:
return emptyTasks
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
}

return convertSyncActivityInfos(
now,
definition.NewWorkflowKey(
ms.executionInfo.NamespaceId,
ms.executionInfo.WorkflowId,
ms.executionState.RunId,
),
ms.pendingActivityInfoIDs,
ms.syncActivityTasks,
)
}

func (ms *MutableStateImpl) updatePendingEventIDs(
Expand Down Expand Up @@ -4305,10 +4306,6 @@ func (ms *MutableStateImpl) updateWithLastWriteEvent(
return nil
}

func (ms *MutableStateImpl) canReplicateEvents() bool {
return ms.namespaceEntry.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumption is broken

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just reuse this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name canReplicateEvents make sense as it return a boolean. But it is up to you.

}

// validateNoEventsAfterWorkflowFinish perform check on history event batch
// NOTE: do not apply this check on every batch, since transient
// workflow task && workflow finish will be broken (the first batch)
Expand Down Expand Up @@ -4464,23 +4461,24 @@ func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool,
func (ms *MutableStateImpl) closeTransactionWithPolicyCheck(
transactionPolicy TransactionPolicy,
) error {

if transactionPolicy == TransactionPolicyPassive ||
!ms.canReplicateEvents() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is the real change

switch transactionPolicy {
case TransactionPolicyActive:
// Cannot use ms.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in
// passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The
// currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster.
activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion(ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion())
currentCluster := ms.clusterMetadata.GetCurrentClusterName()

if activeCluster != currentCluster {
namespaceID := ms.GetExecutionInfo().NamespaceId
return serviceerror.NewNamespaceNotActive(namespaceID, currentCluster, activeCluster)
}
return nil
case TransactionPolicyPassive:
return nil
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
}

// Cannot use ms.namespaceEntry.ActiveClusterName() because currentVersion may be updated during this transaction in
// passive cluster. For example: if passive cluster sees conflict and decided to terminate this workflow. The
// currentVersion on mutable state would be updated to point to last write version which is current (passive) cluster.
activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion(ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion())
currentCluster := ms.clusterMetadata.GetCurrentClusterName()

if activeCluster != currentCluster {
namespaceID := ms.GetExecutionInfo().NamespaceId
return serviceerror.NewNamespaceNotActive(namespaceID, currentCluster, activeCluster)
}
return nil
}

func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
Expand Down Expand Up @@ -4559,17 +4557,24 @@ func (ms *MutableStateImpl) closeTransactionHandleWorkflowReset(
func (ms *MutableStateImpl) closeTransactionHandleActivityUserTimerTasks(
transactionPolicy TransactionPolicy,
) error {

if transactionPolicy == TransactionPolicyPassive ||
!ms.IsWorkflowExecutionRunning() {
switch transactionPolicy {
case TransactionPolicyActive:
if !ms.IsWorkflowExecutionRunning() {
return nil
}
if err := ms.taskGenerator.GenerateActivityTimerTasks(); err != nil {
return err
}
return ms.taskGenerator.GenerateUserTimerTasks()
case TransactionPolicyPassive:
return nil
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return an internal error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better to fail fast here, e.g. panic

i would consider return error when

  • input validation fails
  • modifying a complicated component & do not want to break production

there are only 2 policies here, no safety concern

}
}

if err := ms.taskGenerator.GenerateActivityTimerTasks(); err != nil {
return err
}

return ms.taskGenerator.GenerateUserTimerTasks()
func (ms *MutableStateImpl) generateReplicationTask() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as canReplicateEvents method.

return len(ms.namespaceEntry.ClusterNames()) > 1
}

func (ms *MutableStateImpl) checkMutability(
Expand Down
6 changes: 4 additions & 2 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ type (

// replication tasks
GenerateHistoryReplicationTasks(
branchToken []byte,
events []*historypb.HistoryEvent,
) error
GenerateMigrationTasks() (tasks.Task, error)
Expand Down Expand Up @@ -569,9 +568,12 @@ func (r *TaskGeneratorImpl) GenerateUserTimerTasks() error {
}

func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
branchToken []byte,
events []*historypb.HistoryEvent,
) error {
if len(events) == 0 {
return nil
}

firstEvent := events[0]
lastEvent := events[len(events)-1]
if firstEvent.GetVersion() != lastEvent.GetVersion() {
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.