Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4387,8 +4387,7 @@ func (ms *MutableStateImpl) startTransactionHandleNamespaceMigration(

func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool, error) {

if !ms.IsWorkflowExecutionRunning() ||
!ms.canReplicateEvents() {
if !ms.IsWorkflowExecutionRunning() {
return false, nil
}

Expand Down
71 changes: 55 additions & 16 deletions tests/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {
}
}

func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() {
func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose_WithClusterReconnect() {
namespace := "test-force-workflow-task-close-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Expand Down Expand Up @@ -1624,11 +1624,9 @@ func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() {

s.logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.GetRunId()))

workflowFinished := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

workflowFinished = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Expand All @@ -1647,23 +1645,31 @@ func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() {
T: s.T(),
}

poller2 := &tests.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

// this will fail the workflow task
_, err = poller1.PollAndProcessWorkflowTask(false, true)
s.NoError(err)

s.failover(namespace, clusterName[1], int64(2), client1)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

// Send a signal in cluster
// Update the namespace in cluster 2 to be a single cluster namespace
upReq := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: []*replicationpb.ClusterReplicationConfig{
{
ClusterName: clusterName[1],
},
},
},
}
_, err = client2.UpdateNamespace(tests.NewContext(), upReq)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

// Send a signal to cluster 2, namespace contains one cluster
signalName := "my signal"
signalInput := payloads.EncodeString("my signal input")
_, err = client2.SignalWorkflowExecution(tests.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Expand All @@ -1677,9 +1683,42 @@ func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() {
})
s.NoError(err)

_, err = poller2.PollAndProcessWorkflowTaskWithAttempt(false, false, false, false, 1)
// No error is expected with single cluster namespace.
_, err = client2.DescribeWorkflowExecution(tests.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
})
s.NoError(err)

// Update the namespace in cluster 2 to be a multi cluster namespace
upReq2 := &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: []*replicationpb.ClusterReplicationConfig{
{
ClusterName: clusterName[1],
},
{
ClusterName: clusterName[0],
},
},
},
}
_, err = client2.UpdateNamespace(tests.NewContext(), upReq2)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

// No error is expected with multi cluster namespace.
_, err = client2.DescribeWorkflowExecution(tests.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
})
s.NoError(err)
s.True(workflowFinished)
}

func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
Expand Down