Skip to content

Commit c0a4ecd

Browse files
yux0wxing1292
authored andcommitted
Flush buffer event with single cluster namespace (#4031)
* Flush buffer event with single cluster namespace * update test comment
1 parent 4d5b279 commit c0a4ecd

File tree

2 files changed

+153
-2
lines changed

2 files changed

+153
-2
lines changed

service/history/workflow/mutable_state_impl.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4388,8 +4388,7 @@ func (ms *MutableStateImpl) startTransactionHandleNamespaceMigration(
43884388

43894389
func (ms *MutableStateImpl) startTransactionHandleWorkflowTaskFailover() (bool, error) {
43904390

4391-
if !ms.IsWorkflowExecutionRunning() ||
4392-
!ms.canReplicateEvents() {
4391+
if !ms.IsWorkflowExecutionRunning() {
43934392
return false, nil
43944393
}
43954394

tests/xdc/integration_failover_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,6 +1569,158 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {
15691569
}
15701570
}
15711571

1572+
func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose_WithClusterReconnect() {
1573+
namespace := "test-force-workflow-task-close-" + common.GenerateRandomString(5)
1574+
client1 := s.cluster1.GetFrontendClient() // active
1575+
regReq := &workflowservice.RegisterNamespaceRequest{
1576+
Namespace: namespace,
1577+
IsGlobalNamespace: true,
1578+
Clusters: clusterReplicationConfig,
1579+
ActiveClusterName: clusterName[0],
1580+
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
1581+
}
1582+
_, err := client1.RegisterNamespace(tests.NewContext(), regReq)
1583+
s.NoError(err)
1584+
// Wait for namespace cache to pick the change
1585+
time.Sleep(cacheRefreshInterval)
1586+
1587+
descReq := &workflowservice.DescribeNamespaceRequest{
1588+
Namespace: namespace,
1589+
}
1590+
resp, err := client1.DescribeNamespace(tests.NewContext(), descReq)
1591+
s.NoError(err)
1592+
s.NotNil(resp)
1593+
1594+
client2 := s.cluster2.GetFrontendClient() // standby
1595+
1596+
// Start a workflow
1597+
id := "test-force-workflow-task-close-test"
1598+
wt := "test-force-workflow-task-close-test-type"
1599+
tl := "test-force-workflow-task-close-test-taskqueue"
1600+
identity := "worker1"
1601+
workflowType := &commonpb.WorkflowType{Name: wt}
1602+
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
1603+
startReq := &workflowservice.StartWorkflowExecutionRequest{
1604+
RequestId: uuid.New(),
1605+
Namespace: namespace,
1606+
WorkflowId: id,
1607+
WorkflowType: workflowType,
1608+
TaskQueue: taskQueue,
1609+
Input: nil,
1610+
WorkflowRunTimeout: timestamp.DurationPtr(300 * time.Second),
1611+
WorkflowTaskTimeout: timestamp.DurationPtr(60 * time.Second),
1612+
Identity: identity,
1613+
}
1614+
var we *workflowservice.StartWorkflowExecutionResponse
1615+
for i := 0; i < 10; i++ {
1616+
we, err = client1.StartWorkflowExecution(tests.NewContext(), startReq)
1617+
if err == nil {
1618+
break
1619+
}
1620+
time.Sleep(1 * time.Second)
1621+
}
1622+
s.NoError(err)
1623+
s.NotNil(we.GetRunId())
1624+
1625+
s.logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.GetRunId()))
1626+
1627+
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
1628+
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
1629+
1630+
return []*commandpb.Command{{
1631+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
1632+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
1633+
Result: payloads.EncodeString("Done"),
1634+
}},
1635+
}}, nil
1636+
}
1637+
1638+
poller1 := &tests.TaskPoller{
1639+
Engine: client1,
1640+
Namespace: namespace,
1641+
TaskQueue: taskQueue,
1642+
Identity: identity,
1643+
WorkflowTaskHandler: wtHandler,
1644+
Logger: s.logger,
1645+
T: s.T(),
1646+
}
1647+
1648+
// this will fail the workflow task
1649+
_, err = poller1.PollAndProcessWorkflowTask(false, true)
1650+
s.NoError(err)
1651+
1652+
s.failover(namespace, clusterName[1], int64(2), client1)
1653+
// Wait for namespace cache to pick the change
1654+
time.Sleep(cacheRefreshInterval)
1655+
1656+
// Update the namespace in cluster 2 to be a single cluster namespace
1657+
upReq := &workflowservice.UpdateNamespaceRequest{
1658+
Namespace: namespace,
1659+
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
1660+
Clusters: []*replicationpb.ClusterReplicationConfig{
1661+
{
1662+
ClusterName: clusterName[1],
1663+
},
1664+
},
1665+
},
1666+
}
1667+
_, err = client2.UpdateNamespace(tests.NewContext(), upReq)
1668+
s.NoError(err)
1669+
// Wait for namespace cache to pick the change
1670+
time.Sleep(cacheRefreshInterval)
1671+
1672+
// Send a signal to cluster 2, namespace contains one cluster
1673+
signalName := "my signal"
1674+
signalInput := payloads.EncodeString("my signal input")
1675+
_, err = client2.SignalWorkflowExecution(tests.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
1676+
Namespace: namespace,
1677+
WorkflowExecution: &commonpb.WorkflowExecution{
1678+
WorkflowId: id,
1679+
RunId: we.GetRunId(),
1680+
},
1681+
SignalName: signalName,
1682+
Input: signalInput,
1683+
})
1684+
s.NoError(err)
1685+
1686+
// No error is expected with single cluster namespace.
1687+
_, err = client2.DescribeWorkflowExecution(tests.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
1688+
Namespace: namespace,
1689+
Execution: &commonpb.WorkflowExecution{
1690+
WorkflowId: id,
1691+
},
1692+
})
1693+
s.NoError(err)
1694+
1695+
// Update the namespace in cluster 2 to be a multi cluster namespace
1696+
upReq2 := &workflowservice.UpdateNamespaceRequest{
1697+
Namespace: namespace,
1698+
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
1699+
Clusters: []*replicationpb.ClusterReplicationConfig{
1700+
{
1701+
ClusterName: clusterName[1],
1702+
},
1703+
{
1704+
ClusterName: clusterName[0],
1705+
},
1706+
},
1707+
},
1708+
}
1709+
_, err = client2.UpdateNamespace(tests.NewContext(), upReq2)
1710+
s.NoError(err)
1711+
// Wait for namespace cache to pick the change
1712+
time.Sleep(cacheRefreshInterval)
1713+
1714+
// No error is expected with multi cluster namespace.
1715+
_, err = client2.DescribeWorkflowExecution(tests.NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
1716+
Namespace: namespace,
1717+
Execution: &commonpb.WorkflowExecution{
1718+
WorkflowId: id,
1719+
},
1720+
})
1721+
s.NoError(err)
1722+
}
1723+
15721724
func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
15731725
namespace := "test-transient-workflow-task-workflow-failover-" + common.GenerateRandomString(5)
15741726
client1 := s.cluster1.GetFrontendClient() // active

0 commit comments

Comments
 (0)