diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 4a9be5048cd..c0f2a2566b3 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -3584,8 +3584,176 @@ func (s *Versioning3Suite) verifyVersioningSAs( }, 5*time.Second, 50*time.Millisecond) } +func (s *Versioning3Suite) TestAutoUpgradeWorkflow_NoBouncingBetweenVersions() { + if !s.useNewDeploymentData { + s.T().Skip("This test is only supported on new deployment data") + } + /* + Test plan: + - Use only one read and write partition. + - Update the userData by setting current version to v1. MS reads v1 for this workflow. + - Update the userData by setting the current version to v2. MS reads v2 for this workflow now. + - *Rollback* userData to v1 on that single partition (call lower level API) + - See if the workflow task goes back to original v1 poller (should not) + */ + + s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1) + s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1) + s.OverrideDynamicConfig(dynamicconfig.UseRevisionNumberForWorkerVersioning, true) + + tv1 := testvars.New(s).WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + // Update the userData by setting the current version to v1 + s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{ + CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()), + CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), + RevisionNumber: 1, + }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv1.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{ + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }}, []string{}, tqTypeWf, tqTypeAct) + + // Wait until all task queue partitions know that v1 is current. + s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + + // Start the workflow on version v1. + s.startWorkflow(tv1, nil) + + // Poll for workflows on v1. + channel := make(chan struct{}) + s.pollWftAndHandle(tv1, false, channel, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + resp := respondEmptyWft(tv1, false, vbUnpinned) + resp.ForceCreateNewWorkflowTask = true + return resp, nil + }) + <-channel + + // Verify that all workflows are running on v1. + s.EventuallyWithT(func(t *assert.CollectT) { + s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil) + }, 10*time.Second, 100*time.Millisecond) + + // Update the userData by setting the current version to v2. + s.updateTaskQueueDeploymentDataWithRoutingConfig(tv2, &deploymentpb.RoutingConfig{ + CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv2.DeploymentVersionString()), + CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), + RevisionNumber: 2, + }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv2.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{ + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }}, []string{}, tqTypeWf, tqTypeAct) + + // Wait until all task queue partitions know that v2 is current. + s.waitForDeploymentDataPropagation(tv2, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + + // Poll for workflows but this time the workflow task should be acted upon by a v2 worker. + channel = make(chan struct{}) + s.pollWftAndHandle(tv2, false, channel, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + resp := respondEmptyWft(tv2, false, vbUnpinned) + resp.ForceCreateNewWorkflowTask = true + return resp, nil + }) + <-channel + // Verify that all workflows are running on v2. + s.EventuallyWithT(func(t *assert.CollectT) { + s.verifyWorkflowVersioning(tv2, vbUnpinned, tv2.Deployment(), nil, nil) + }, 10*time.Second, 100*time.Millisecond) + + // Rollback the userData to v1. Using the lower API here. + currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueue: tv1.TaskQueue().GetName(), + TaskQueueType: tqTypeWf, + }) + s.NoError(err) + + _, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueue: tv1.TaskQueue().GetName(), + UserData: &persistencespb.VersionedTaskQueueUserData{ + Data: &persistencespb.TaskQueueUserData{ + PerType: map[int32]*persistencespb.TaskQueueTypeUserData{ + int32(tqTypeWf): { + DeploymentData: &persistencespb.DeploymentData{ + DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{ + tv1.DeploymentVersion().GetDeploymentName(): { + RoutingConfig: &deploymentpb.RoutingConfig{ + CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()), + CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)), + RevisionNumber: 1, + }, + Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{ + tv1.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }, + }, + }, + }, + }, + }, + int32(tqTypeAct): { + DeploymentData: &persistencespb.DeploymentData{ + DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{ + tv1.DeploymentVersion().GetDeploymentName(): { + RoutingConfig: &deploymentpb.RoutingConfig{ + CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()), + CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)), + RevisionNumber: 1, + }, + Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{ + tv1.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }, + }, + }, + }, + }, + }, + }, + }, + Version: currentData.GetUserData().GetVersion(), + }, + }) + s.NoError(err) + + // Verify that the userData is rolled back to v1. + s.Eventually(func() bool { + ms, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueue: tv1.TaskQueue().GetName(), + TaskQueueType: tqTypeWf, + }) + s.NoError(err) + + // Find the current version for this task-queue specifically. It should be set to v1. + current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData()) + return current.GetBuildId() == tv1.DeploymentVersion().GetBuildId() && currentRevisionNumber == 1 + }, 10*time.Second, 100*time.Millisecond) + + // Ensure that the workflow does not bounce back to v1. + //nolint:testifylint + go s.idlePollWorkflow(tv1, true, ver3MinPollTime, "workflow should not bounce back to v1") + + // Poll and complete the workflow on v2. + s.pollWftAndHandle(tv2, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tv2, vbUnpinned), nil + }) + + // Verify that the workflow is running on v2. + s.EventuallyWithT(func(t *assert.CollectT) { + s.verifyWorkflowVersioning(tv2, vbUnpinned, tv2.Deployment(), nil, nil) + }, 10*time.Second, 100*time.Millisecond) +} + func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() { - s.T().Skip("This test is flaky right now and shall be fixed in a future PR.") // TODO (Shivam) + if !s.useNewDeploymentData { + s.T().Skip("This test is only supported on new deployment data") + } /* Test plan: @@ -3601,6 +3769,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1) s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1) + s.OverrideDynamicConfig(dynamicconfig.UseRevisionNumberForWorkerVersioning, true) tv0 := testvars.New(s).WithBuildIDNumber(0) tv1 := tv0.WithBuildIDNumber(1) @@ -3656,13 +3825,6 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() }, 10*time.Second, 100*time.Millisecond) } - // Verify that all workflows are running on v0 by using v1 vars - for i := 0; i < numWorkflows; i++ { - s.EventuallyWithT(func(t *assert.CollectT) { - s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv0.Deployment(), nil, nil) - }, 10*time.Second, 100*time.Millisecond) - } - // Update the userData by setting the current version to v1. s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{ CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()), @@ -3673,7 +3835,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() }}, []string{}, tqTypeWf, tqTypeAct) // Wait until all task queue partitions know that v1 is current. - s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + // s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct) // Poll for workflows but this time the workflow task should be acted upon by a v1 worker. channels = make([]chan struct{}, numWorkflows) @@ -3699,7 +3861,15 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() } // Rollback the userData to v0. Using the lower API here. - _, err := s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{ + + currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueue: tv0.TaskQueue().GetName(), + TaskQueueType: tqTypeWf, + }) + s.NoError(err) + + _, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{ NamespaceId: s.NamespaceID().String(), TaskQueue: tv0.TaskQueue().GetName(), UserData: &persistencespb.VersionedTaskQueueUserData{ @@ -3743,10 +3913,25 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() }, }, }, - Version: 0, + Version: currentData.GetUserData().GetVersion(), }, }) s.NoError(err) + + // Verify that the userData is rolled back to v0. + s.Eventually(func() bool { + ms, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: s.NamespaceID().String(), + TaskQueue: tv0.TaskQueue().GetName(), + TaskQueueType: tqTypeWf, + }) + s.NoError(err) + + // Find the current version for this task-queue specifically. It should be set to v0. + current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData()) + return current.GetBuildId() == tv0.DeploymentVersion().GetBuildId() && currentRevisionNumber == 0 + }, 10*time.Second, 100*time.Millisecond) + // Even though the userData is rolled back to v0, the workflows should only continue to run on v1. // Start idle pollers on v0 and ensure they never receive a task. for i := 0; i < numWorkflows; i++ { @@ -3776,7 +3961,6 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() }, 60*time.Second, 100*time.Millisecond) } } - func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition() { if !s.useNewDeploymentData { s.T().Skip("This test is only supported on new deployment data")