Skip to content

Commit 0fb6385

Browse files
committed
multiple autoupgrade workflows should not bounce
1 parent 8fc77bb commit 0fb6385

File tree

1 file changed

+212
-0
lines changed

1 file changed

+212
-0
lines changed

tests/versioning_3_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3734,6 +3734,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflow_NoBouncingBetweenVersions() {
37343734
}, 10*time.Second, 100*time.Millisecond)
37353735

37363736
// Ensure that the workflow does not bounce back to v1.
3737+
//nolint:testifylint
37373738
go s.idlePollWorkflow(tv1, true, ver3MinPollTime, "workflow should not bounce back to v1")
37383739

37393740
// Poll and complete the workflow on v2.
@@ -3749,6 +3750,217 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflow_NoBouncingBetweenVersions() {
37493750
}, 10*time.Second, 100*time.Millisecond)
37503751
}
37513752

3753+
func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() {
3754+
if !s.useNewDeploymentData {
3755+
s.T().Skip("This test is only supported on new deployment data")
3756+
}
3757+
/*
3758+
3759+
Test plan:
3760+
- Use only one read and write partition.
3761+
- Update the userData by setting current version to v0.
3762+
- Start 10 workflows on v0. MS reads v0 for these workflows.
3763+
- Update the userData by setting the current version to v1.
3764+
- Complete workflow task on v1. MS reads v1 for these workflows now.
3765+
- *Rollback* userData to v0 on that single partition (call lower level API)
3766+
- See if any workflow task goes back to original v0 poller (should not)
3767+
3768+
*/
3769+
3770+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
3771+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
3772+
s.OverrideDynamicConfig(dynamicconfig.UseRevisionNumberForWorkerVersioning, true)
3773+
3774+
tv0 := testvars.New(s).WithBuildIDNumber(0)
3775+
tv1 := tv0.WithBuildIDNumber(1)
3776+
3777+
// Update the userData by setting the current version to v0
3778+
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv0, &deploymentpb.RoutingConfig{
3779+
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
3780+
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
3781+
RevisionNumber: 1,
3782+
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv0.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
3783+
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
3784+
}}, []string{}, tqTypeWf, tqTypeAct)
3785+
3786+
// Wait until all task queue partitions know that v0 is current.
3787+
s.waitForDeploymentDataPropagation(tv0, versionStatusCurrent, false, tqTypeWf, tqTypeAct)
3788+
3789+
// Make numWorkflows different workflow ID's so that we can verify that all workflows are running on v0.
3790+
numWorkflows := 10
3791+
wfVarsV0 := make([]*testvars.TestVars, numWorkflows)
3792+
wfVarsV1 := make([]*testvars.TestVars, numWorkflows)
3793+
3794+
for i := 0; i < numWorkflows; i++ {
3795+
wfVarsV0[i] = tv0.WithWorkflowIDNumber(i)
3796+
wfVarsV1[i] = tv1.WithWorkflowIDNumber(i)
3797+
}
3798+
3799+
// Start all different workflows on version v0.
3800+
for i := 0; i < numWorkflows; i++ {
3801+
s.startWorkflow(wfVarsV0[i], nil)
3802+
}
3803+
3804+
// Poll for workflows on v0.
3805+
channels := make([]chan struct{}, numWorkflows)
3806+
for i := 0; i < numWorkflows; i++ {
3807+
channels[i] = make(chan struct{})
3808+
s.pollWftAndHandle(wfVarsV0[i], false, channels[i],
3809+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
3810+
s.NotNil(task)
3811+
resp := respondEmptyWft(wfVarsV0[i], false, vbUnpinned)
3812+
resp.ForceCreateNewWorkflowTask = true
3813+
return resp, nil
3814+
})
3815+
}
3816+
// Wait for channels to be closed
3817+
for i := 0; i < numWorkflows; i++ {
3818+
<-channels[i]
3819+
}
3820+
3821+
// Verify that all workflows are running on v0.
3822+
for i := 0; i < numWorkflows; i++ {
3823+
s.EventuallyWithT(func(t *assert.CollectT) {
3824+
s.verifyWorkflowVersioning(wfVarsV0[i], vbUnpinned, tv0.Deployment(), nil, nil)
3825+
}, 10*time.Second, 100*time.Millisecond)
3826+
}
3827+
3828+
// Update the userData by setting the current version to v1.
3829+
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{
3830+
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
3831+
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
3832+
RevisionNumber: 2,
3833+
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv1.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
3834+
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
3835+
}}, []string{}, tqTypeWf, tqTypeAct)
3836+
3837+
// Wait until all task queue partitions know that v1 is current.
3838+
// s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct)
3839+
3840+
// Poll for workflows but this time the workflow task should be acted upon by a v1 worker.
3841+
channels = make([]chan struct{}, numWorkflows)
3842+
for i := 0; i < numWorkflows; i++ {
3843+
channels[i] = make(chan struct{})
3844+
s.pollWftAndHandle(wfVarsV1[i], false, channels[i],
3845+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
3846+
s.NotNil(task)
3847+
resp := respondEmptyWft(wfVarsV1[i], false, vbUnpinned)
3848+
resp.ForceCreateNewWorkflowTask = true
3849+
return resp, nil
3850+
})
3851+
}
3852+
// Wait for channels to be closed
3853+
for i := 0; i < numWorkflows; i++ {
3854+
<-channels[i]
3855+
}
3856+
// Verify that all workflows are running on v1.
3857+
for i := 0; i < numWorkflows; i++ {
3858+
s.EventuallyWithT(func(t *assert.CollectT) {
3859+
s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil)
3860+
}, 10*time.Second, 100*time.Millisecond)
3861+
}
3862+
3863+
// Rollback the userData to v0. Using the lower API here.
3864+
3865+
currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
3866+
NamespaceId: s.NamespaceID().String(),
3867+
TaskQueue: tv0.TaskQueue().GetName(),
3868+
TaskQueueType: tqTypeWf,
3869+
})
3870+
s.NoError(err)
3871+
3872+
_, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{
3873+
NamespaceId: s.NamespaceID().String(),
3874+
TaskQueue: tv0.TaskQueue().GetName(),
3875+
UserData: &persistencespb.VersionedTaskQueueUserData{
3876+
Data: &persistencespb.TaskQueueUserData{
3877+
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
3878+
int32(tqTypeWf): {
3879+
DeploymentData: &persistencespb.DeploymentData{
3880+
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
3881+
tv0.DeploymentVersion().GetDeploymentName(): {
3882+
RoutingConfig: &deploymentpb.RoutingConfig{
3883+
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
3884+
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
3885+
RevisionNumber: 0,
3886+
},
3887+
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
3888+
tv0.DeploymentVersion().GetBuildId(): {
3889+
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
3890+
},
3891+
},
3892+
},
3893+
},
3894+
},
3895+
},
3896+
int32(tqTypeAct): {
3897+
DeploymentData: &persistencespb.DeploymentData{
3898+
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
3899+
tv0.DeploymentVersion().GetDeploymentName(): {
3900+
RoutingConfig: &deploymentpb.RoutingConfig{
3901+
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv0.DeploymentVersionString()),
3902+
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
3903+
RevisionNumber: 0,
3904+
},
3905+
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
3906+
tv0.DeploymentVersion().GetBuildId(): {
3907+
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
3908+
},
3909+
},
3910+
},
3911+
},
3912+
},
3913+
},
3914+
},
3915+
},
3916+
Version: currentData.GetUserData().GetVersion(),
3917+
},
3918+
})
3919+
s.NoError(err)
3920+
3921+
// Verify that the userData is rolled back to v0.
3922+
s.Eventually(func() bool {
3923+
ms, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
3924+
NamespaceId: s.NamespaceID().String(),
3925+
TaskQueue: tv0.TaskQueue().GetName(),
3926+
TaskQueueType: tqTypeWf,
3927+
})
3928+
s.NoError(err)
3929+
3930+
// Find the current version for this task-queue specifically. It should be set to v0.
3931+
current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData())
3932+
return current.GetBuildId() == tv0.DeploymentVersion().GetBuildId() && currentRevisionNumber == 0
3933+
}, 10*time.Second, 100*time.Millisecond)
3934+
3935+
// Even though the userData is rolled back to v0, the workflows should only continue to run on v1.
3936+
// Start idle pollers on v0 and ensure they never receive a task.
3937+
for i := 0; i < numWorkflows; i++ {
3938+
//nolint:testifylint
3939+
go s.idlePollWorkflow(wfVarsV0[i], true, ver3MinPollTime, "workflows should not go to the old deployment")
3940+
}
3941+
3942+
// Complete all workflows on v1.
3943+
channels = make([]chan struct{}, numWorkflows)
3944+
for i := 0; i < numWorkflows; i++ {
3945+
channels[i] = make(chan struct{})
3946+
s.pollWftAndHandle(wfVarsV1[i], false, channels[i],
3947+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
3948+
s.NotNil(task)
3949+
return respondCompleteWorkflow(wfVarsV1[i], vbUnpinned), nil
3950+
})
3951+
}
3952+
// Wait for channels to be closed
3953+
for i := 0; i < numWorkflows; i++ {
3954+
<-channels[i]
3955+
}
3956+
3957+
// Verify that all workflows are completed on v1.
3958+
for i := 0; i < numWorkflows; i++ {
3959+
s.EventuallyWithT(func(t *assert.CollectT) {
3960+
s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv1.Deployment(), nil, nil)
3961+
}, 60*time.Second, 100*time.Millisecond)
3962+
}
3963+
}
37523964
func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition() {
37533965
if !s.useNewDeploymentData {
37543966
s.T().Skip("This test is only supported on new deployment data")

0 commit comments

Comments
 (0)