Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (d *VersionWorkflowRunner) handleUpdateVersionMetadata(ctx workflow.Context
}, nil
}

func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context, isCan bool) {
func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context) {
if d.VersionState.GetDrainageInfo().GetStatus() == enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED {
now := timestamppb.New(workflow.Now(ctx))
d.VersionState.DrainageInfo = &deploymentpb.VersionDrainageInfo{
Expand Down Expand Up @@ -543,7 +543,7 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
if newStatus == enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING {
// Version deactivated from current/ramping
d.VersionState.LastDeactivationTime = args.RoutingUpdateTime
d.startDrainage(ctx, false)
d.startDrainage(ctx)
}

// started accepting new workflows
Expand All @@ -552,6 +552,15 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
// First time this version is activated to current/ramping
d.VersionState.FirstActivationTime = args.RoutingUpdateTime
}

// Clear drainage information, if present, when a version gets activated.
// This handles the rollback scenario where a previously draining/drained version
// is reactivated and should have it's drainage information cleared.

v := workflow.GetVersion(ctx, "Step1", workflow.DefaultVersion, 0)
if v != workflow.DefaultVersion {
d.VersionState.DrainageInfo = nil
}
}

// status of the version is updated after a successful sync to all its task-queues
Expand Down
357 changes: 357 additions & 0 deletions tests/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,363 @@ func (s *WorkerDeploymentSuite) verifyTaskQueueVersioningInfo(ctx context.Contex
}, time.Second*10, time.Millisecond*1000)
}

// This test shall first rollback a drained version to a current version. After that, it shall deploy a new version
// which shall further drain this current version.
func (s *WorkerDeploymentSuite) TestDrainRollbackedVersion() {
s.OverrideDynamicConfig(dynamicconfig.PollerHistoryTTL, 500*time.Millisecond)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
tv1 := testvars.New(s).WithBuildIDNumber(1)
tv2 := testvars.New(s).WithBuildIDNumber(2)
tv3 := testvars.New(s).WithBuildIDNumber(3)

// Start deployment workflow 1 and wait for the deployment version to exist
v1CreateTime := timestamppb.New(time.Now())
s.startVersionWorkflow(ctx, tv1)

// Set v1 as current version
setCurrentV1UpdateTime := timestamppb.Now()
s.setCurrentVersion(ctx, tv1, worker_versioning.UnversionedVersionId, true, "")
resp, err := s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv1.DeploymentSeries(),
})
s.NoError(err)
s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv1.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentVersion: tv1.DeploymentVersionString(),
CurrentVersionChangedTime: setCurrentV1UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: setCurrentV1UpdateTime,
CurrentSinceTime: setCurrentV1UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV1UpdateTime,
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
LastModifierIdentity: tv1.ClientIdentity(),
},
})

// Start deployment workflow 2 and set v2 to current so that v1 can start draining
v2CreateTime := timestamppb.New(time.Now())
s.startVersionWorkflow(ctx, tv2)

setCurrentV2UpdateTime := timestamppb.New(time.Now())
s.setCurrentVersion(ctx, tv2, tv1.DeploymentVersionString(), true, "")
resp, err = s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv2.DeploymentSeries(),
})
s.NoError(err)

s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv2.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
RampingVersion: "",
RampingVersionPercentage: 0,
RampingVersionChangedTime: nil,
RampingVersionPercentageChangedTime: nil,
CurrentVersion: tv2.DeploymentVersionString(),
CurrentVersionChangedTime: setCurrentV2UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
DrainageInfo: &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINING},
RoutingUpdateTime: setCurrentV1UpdateTime,
CurrentSinceTime: nil,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV1UpdateTime,
LastDeactivationTime: setCurrentV2UpdateTime,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING,
},
{
Version: tv2.DeploymentVersionString(),
CreateTime: v2CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: setCurrentV2UpdateTime,
CurrentSinceTime: setCurrentV2UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV2UpdateTime,
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
LastModifierIdentity: tv2.ClientIdentity(),
},
})

// wait for v1 to be drained
s.EventuallyWithT(func(t *assert.CollectT) {
a := require.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{
Namespace: s.Namespace().String(),
Version: tv1.DeploymentVersionString(),
})
a.NoError(err)
a.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus())
}, time.Second*10, time.Millisecond*1000)

// Verify that the drainageStatus of v1 has been updated in the VersionSummaries
s.EventuallyWithT(func(t *assert.CollectT) {
resp, err = s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv2.DeploymentSeries(),
})
s.NoError(err)
s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv2.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
RampingVersion: "",
RampingVersionPercentage: 0,
RampingVersionChangedTime: nil,
RampingVersionPercentageChangedTime: nil,
CurrentVersion: tv2.DeploymentVersionString(),
CurrentVersionChangedTime: setCurrentV2UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
DrainageInfo: &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED},
RoutingUpdateTime: setCurrentV1UpdateTime,
CurrentSinceTime: nil,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV1UpdateTime,
LastDeactivationTime: setCurrentV2UpdateTime,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED,
},
{
Version: tv2.DeploymentVersionString(),
CreateTime: v2CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: setCurrentV2UpdateTime,
CurrentSinceTime: setCurrentV2UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV2UpdateTime,
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
LastModifierIdentity: tv2.ClientIdentity(),
},
})
}, time.Second*10, time.Millisecond*1000)

// start ramping traffic back to v1
setRampingUpdateTime := timestamppb.Now()
s.setAndVerifyRampingVersion(ctx, tv1, false, 10, false, "", &workflowservice.SetWorkerDeploymentRampingVersionResponse{
ConflictToken: nil,
PreviousVersion: "",
PreviousPercentage: 0,
})

// verify if the right information is set in the DescribeWorkerDeployment response
s.EventuallyWithT(func(t *assert.CollectT) {
resp, err = s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv2.DeploymentSeries(),
})
s.NoError(err)
s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv2.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
RampingVersion: tv1.DeploymentVersionString(),
RampingVersionPercentage: 10,
RampingVersionChangedTime: setRampingUpdateTime,
RampingVersionPercentageChangedTime: setRampingUpdateTime,
CurrentVersion: tv2.DeploymentVersionString(),
CurrentVersionChangedTime: setCurrentV2UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: setRampingUpdateTime,
CurrentSinceTime: nil,
RampingSinceTime: setRampingUpdateTime,
FirstActivationTime: setCurrentV1UpdateTime, // note: this is setCurrentV1UpdateTime since the version was initially activated when it was current.
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_RAMPING,
},
{
Version: tv2.DeploymentVersionString(),
CreateTime: v2CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: setCurrentV2UpdateTime,
CurrentSinceTime: setCurrentV2UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV2UpdateTime,
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
LastModifierIdentity: tv2.ClientIdentity(),
},
})
}, time.Second*10, time.Millisecond*1000)

// Set version v1 as the current version; this shall drain out v2
newCurrentV1UpdateTime := timestamppb.Now()
s.setCurrentVersion(ctx, tv1, tv2.DeploymentVersionString(), true, "")

// Verify that v2 is drained
s.EventuallyWithT(func(t *assert.CollectT) {
a := require.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{
Namespace: s.Namespace().String(),
Version: tv2.DeploymentVersionString(),
})
a.NoError(err)
a.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus())
}, time.Second*10, time.Millisecond*1000)

// verify if the right information is set in the DescribeWorkerDeployment response
s.EventuallyWithT(func(t *assert.CollectT) {
resp, err = s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv1.DeploymentSeries(),
})
s.NoError(err)
s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv1.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentVersion: tv1.DeploymentVersionString(),
CurrentVersionChangedTime: newCurrentV1UpdateTime,
RampingVersion: "",
RampingVersionPercentage: 0,
RampingVersionChangedTime: newCurrentV1UpdateTime,
RampingVersionPercentageChangedTime: newCurrentV1UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
DrainageInfo: nil,
RoutingUpdateTime: newCurrentV1UpdateTime,
CurrentSinceTime: newCurrentV1UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV1UpdateTime, // note: this is setCurrentV1UpdateTime since the version was initially activated when it was current.
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
{
Version: tv2.DeploymentVersionString(),
CreateTime: v2CreateTime,
DrainageInfo: &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED},
RoutingUpdateTime: setCurrentV2UpdateTime,
CurrentSinceTime: nil,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV2UpdateTime,
LastDeactivationTime: newCurrentV1UpdateTime,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED,
},
},
LastModifierIdentity: tv2.ClientIdentity(),
},
})
}, time.Second*10, time.Millisecond*1000)

// Roll out a new version v3 and set it to current
v3CreateTime := timestamppb.Now()
s.startVersionWorkflow(ctx, tv3)

newCurrentV3UpdateTime := timestamppb.Now()
s.setCurrentVersion(ctx, tv3, tv1.DeploymentVersionString(), true, "")

// Verify that v1 is drained eventually
s.EventuallyWithT(func(t *assert.CollectT) {
a := require.New(t)
resp, err := s.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{
Namespace: s.Namespace().String(),
Version: tv1.DeploymentVersionString(),
})
a.NoError(err)
a.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus())
}, time.Second*10, time.Millisecond*1000)

// Verify that v1, which was rolled back to being current previously, is drained with it's information present
// in the deployment workflow.
s.EventuallyWithT(func(t *assert.CollectT) {
resp, err = s.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: s.Namespace().String(),
DeploymentName: tv1.DeploymentSeries(),
})
s.NoError(err)
s.verifyDescribeWorkerDeployment(resp, &workflowservice.DescribeWorkerDeploymentResponse{
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
Name: tv1.DeploymentSeries(),
CreateTime: v1CreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
CurrentVersion: tv3.DeploymentVersionString(),
CurrentVersionChangedTime: newCurrentV3UpdateTime,
RampingVersion: "",
RampingVersionPercentage: 0,
RampingVersionChangedTime: newCurrentV1UpdateTime,
RampingVersionPercentageChangedTime: newCurrentV1UpdateTime,
},
VersionSummaries: []*deploymentpb.WorkerDeploymentInfo_WorkerDeploymentVersionSummary{
{
Version: tv1.DeploymentVersionString(),
CreateTime: v1CreateTime,
RoutingUpdateTime: newCurrentV1UpdateTime,
DrainageInfo: &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED},
CurrentSinceTime: nil,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV1UpdateTime, // note: this is setCurrentV1UpdateTime since the version was initially activated when it was current.
LastDeactivationTime: newCurrentV3UpdateTime,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED,
},
{
Version: tv2.DeploymentVersionString(),
CreateTime: v2CreateTime,
DrainageInfo: &deploymentpb.VersionDrainageInfo{Status: enumspb.VERSION_DRAINAGE_STATUS_DRAINED},
RoutingUpdateTime: setCurrentV2UpdateTime,
CurrentSinceTime: nil,
RampingSinceTime: nil,
FirstActivationTime: setCurrentV2UpdateTime,
LastDeactivationTime: newCurrentV1UpdateTime,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED,
},
{
Version: tv3.DeploymentVersionString(),
CreateTime: v3CreateTime,
RoutingUpdateTime: newCurrentV3UpdateTime,
DrainageInfo: nil,
CurrentSinceTime: newCurrentV3UpdateTime,
RampingSinceTime: nil,
FirstActivationTime: newCurrentV3UpdateTime,
LastDeactivationTime: nil,
Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT,
},
},
LastModifierIdentity: tv2.ClientIdentity(),
},
})
}, time.Second*10, time.Millisecond*1000)
}

// Test that rolling back to a drained version works
func (s *WorkerDeploymentSuite) TestSetRampingVersion_AfterDrained() {
s.OverrideDynamicConfig(dynamicconfig.PollerHistoryTTL, 500*time.Millisecond)
Expand Down
Loading