Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 196 additions & 12 deletions tests/versioning_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3584,8 +3584,176 @@ func (s *Versioning3Suite) verifyVersioningSAs(
}, 5*time.Second, 50*time.Millisecond)
}

func (s *Versioning3Suite) TestAutoUpgradeWorkflow_NoBouncingBetweenVersions() {
if !s.useNewDeploymentData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bool is always false, right? you should use useRevisionNumbers because that's the flag that controls this logic. in that case you don't need to set UseRevisionNumberForWorkerVersioning again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - actually, now that I think about this and read all your latter comments, it's best if we get this PR in after my other open PR goes in.

The reason why I say this is because I have done some refactoring there anyways, which shall reduce the clutter here.

s.T().Skip("This test is only supported on new deployment data")
}
/*
Test plan:
- Use only one read and write partition.
- Update the userData by setting current version to v1. MS reads v1 for this workflow.
- Update the userData by setting the current version to v2. MS reads v2 for this workflow now.
- *Rollback* userData to v1 on that single partition (call lower level API)
- See if the workflow task goes back to original v1 poller (should not)
*/

s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
s.OverrideDynamicConfig(dynamicconfig.UseRevisionNumberForWorkerVersioning, true)

tv1 := testvars.New(s).WithBuildIDNumber(1)
tv2 := tv1.WithBuildIDNumber(2)

// Update the userData by setting the current version to v1
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
RevisionNumber: 1,
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv1.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
}}, []string{}, tqTypeWf, tqTypeAct)

// Wait until all task queue partitions know that v1 is current.
s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, later when we clean up all old tests we should maybe change this function to wait for the particular revision number propagation rather than status of the version.


// Start the workflow on version v1.
s.startWorkflow(tv1, nil)

// Poll for workflows on v1.
channel := make(chan struct{})
s.pollWftAndHandle(tv1, false, channel,
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
resp := respondEmptyWft(tv1, false, vbUnpinned)
resp.ForceCreateNewWorkflowTask = true
return resp, nil
})
<-channel

// Verify that all workflows are running on v1.
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)

// Update the userData by setting the current version to v2.
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv2, &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv2.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now()),
RevisionNumber: 2,
}, map[string]*deploymentspb.WorkerDeploymentVersionData{tv2.DeploymentVersion().GetBuildId(): &deploymentspb.WorkerDeploymentVersionData{
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
}}, []string{}, tqTypeWf, tqTypeAct)

// Wait until all task queue partitions know that v2 is current.
s.waitForDeploymentDataPropagation(tv2, versionStatusCurrent, false, tqTypeWf, tqTypeAct)

// Poll for workflows but this time the workflow task should be acted upon by a v2 worker.
channel = make(chan struct{})
s.pollWftAndHandle(tv2, false, channel,
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
resp := respondEmptyWft(tv2, false, vbUnpinned)
resp.ForceCreateNewWorkflowTask = true
return resp, nil
})
<-channel
// Verify that all workflows are running on v2.
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(tv2, vbUnpinned, tv2.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)

// Rollback the userData to v1. Using the lower API here.
currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv1.TaskQueue().GetName(),
TaskQueueType: tqTypeWf,
})
s.NoError(err)

_, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine you use this in other similar tests, no? seems like a good candidate to extract as a helper

NamespaceId: s.NamespaceID().String(),
TaskQueue: tv1.TaskQueue().GetName(),
UserData: &persistencespb.VersionedTaskQueueUserData{
Data: &persistencespb.TaskQueueUserData{
PerType: map[int32]*persistencespb.TaskQueueTypeUserData{
int32(tqTypeWf): {
DeploymentData: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
tv1.DeploymentVersion().GetDeploymentName(): {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
RevisionNumber: 1,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
tv1.DeploymentVersion().GetBuildId(): {
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
},
},
},
},
int32(tqTypeAct): {
DeploymentData: &persistencespb.DeploymentData{
DeploymentsData: map[string]*persistencespb.WorkerDeploymentData{
tv1.DeploymentVersion().GetDeploymentName(): {
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
CurrentVersionChangedTime: timestamp.TimePtr(time.Now().Add(-time.Second)),
RevisionNumber: 1,
},
Versions: map[string]*deploymentspb.WorkerDeploymentVersionData{
tv1.DeploymentVersion().GetBuildId(): {
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
},
},
},
},
},
},
Version: currentData.GetUserData().GetVersion(),
},
})
s.NoError(err)

// Verify that the userData is rolled back to v1.
s.Eventually(func() bool {
ms, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv1.TaskQueue().GetName(),
TaskQueueType: tqTypeWf,
})
s.NoError(err)

// Find the current version for this task-queue specifically. It should be set to v1.
current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData())
return current.GetBuildId() == tv1.DeploymentVersion().GetBuildId() && currentRevisionNumber == 1
}, 10*time.Second, 100*time.Millisecond)

// Ensure that the workflow does not bounce back to v1.
//nolint:testifylint
go s.idlePollWorkflow(tv1, true, ver3MinPollTime, "workflow should not bounce back to v1")

// Poll and complete the workflow on v2.
s.pollWftAndHandle(tv2, false, nil,
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
s.NotNil(task)
return respondCompleteWorkflow(tv2, vbUnpinned), nil
})

// Verify that the workflow is running on v2.
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(tv2, vbUnpinned, tv2.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)
}

func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is names same as above, is it testing the same scenario?

s.T().Skip("This test is flaky right now and shall be fixed in a future PR.") // TODO (Shivam)
if !s.useNewDeploymentData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

s.T().Skip("This test is only supported on new deployment data")
}
/*

Test plan:
Expand All @@ -3601,6 +3769,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()

s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
s.OverrideDynamicConfig(dynamicconfig.UseRevisionNumberForWorkerVersioning, true)

tv0 := testvars.New(s).WithBuildIDNumber(0)
tv1 := tv0.WithBuildIDNumber(1)
Expand Down Expand Up @@ -3656,13 +3825,6 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
}, 10*time.Second, 100*time.Millisecond)
}

// Verify that all workflows are running on v0 by using v1 vars
for i := 0; i < numWorkflows; i++ {
s.EventuallyWithT(func(t *assert.CollectT) {
s.verifyWorkflowVersioning(wfVarsV1[i], vbUnpinned, tv0.Deployment(), nil, nil)
}, 10*time.Second, 100*time.Millisecond)
}

// Update the userData by setting the current version to v1.
s.updateTaskQueueDeploymentDataWithRoutingConfig(tv1, &deploymentpb.RoutingConfig{
CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv1.DeploymentVersionString()),
Expand All @@ -3673,7 +3835,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
}}, []string{}, tqTypeWf, tqTypeAct)

// Wait until all task queue partitions know that v1 is current.
s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct)
// s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct)

// Poll for workflows but this time the workflow task should be acted upon by a v1 worker.
channels = make([]chan struct{}, numWorkflows)
Expand All @@ -3699,7 +3861,15 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
}

// Rollback the userData to v0. Using the lower API here.
_, err := s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{

currentData, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv0.TaskQueue().GetName(),
TaskQueueType: tqTypeWf,
})
s.NoError(err)

_, err = s.GetTestCluster().MatchingClient().UpdateTaskQueueUserData(context.Background(), &matchingservice.UpdateTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv0.TaskQueue().GetName(),
UserData: &persistencespb.VersionedTaskQueueUserData{
Expand Down Expand Up @@ -3743,10 +3913,25 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
},
},
},
Version: 0,
Version: currentData.GetUserData().GetVersion(),
},
})
s.NoError(err)

// Verify that the userData is rolled back to v0.
s.Eventually(func() bool {
ms, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData(context.Background(), &matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: s.NamespaceID().String(),
TaskQueue: tv0.TaskQueue().GetName(),
TaskQueueType: tqTypeWf,
})
s.NoError(err)

// Find the current version for this task-queue specifically. It should be set to v0.
current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData())
return current.GetBuildId() == tv0.DeploymentVersion().GetBuildId() && currentRevisionNumber == 0
}, 10*time.Second, 100*time.Millisecond)

// Even though the userData is rolled back to v0, the workflows should only continue to run on v1.
// Start idle pollers on v0 and ensure they never receive a task.
for i := 0; i < numWorkflows; i++ {
Expand Down Expand Up @@ -3776,7 +3961,6 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions()
}, 60*time.Second, 100*time.Millisecond)
}
}

func (s *Versioning3Suite) TestWorkflowTQLags_DependentActivityStartsTransition() {
if !s.useNewDeploymentData {
s.T().Skip("This test is only supported on new deployment data")
Expand Down
Loading