Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
# - value: true
# matching.PollerHistoryTTL:
# - value: 1s
# matching.wv.VersionDrainageStatusVisibilityGracePeriod:
# - value: 5s
# matching.wv.VersionDrainageStatusRefreshInterval:
# - value: 5s
limit.maxIDLength:
- value: 255
constraints: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#
# system.enableDeploymentVersions=true
# matching.PollerHistoryTTL=1s
# matching.wv.VersionDrainageStatusVisibilityGracePeriod=5s
# matching.wv.VersionDrainageStatusRefreshInterval=5s
#
# Then run this script.
#
Expand All @@ -18,8 +20,8 @@ version="1.0"
# Expected workflow counts - users can override these if their changes are expected to generate more workflows which will be true when a breaking change to
# these worfklows is introduced.
# These values are used by the replay tester to validate that your workflow changes haven't accidentally created additional executions.
EXPECTED_DEPLOYMENT_WORKFLOWS=${EXPECTED_DEPLOYMENT_WORKFLOWS:-8}
EXPECTED_VERSION_WORKFLOWS=${EXPECTED_VERSION_WORKFLOWS:-10}
EXPECTED_DEPLOYMENT_WORKFLOWS=${EXPECTED_DEPLOYMENT_WORKFLOWS:-12}
EXPECTED_VERSION_WORKFLOWS=${EXPECTED_VERSION_WORKFLOWS:-14}

echo "📋 Expected workflow counts:"
echo " Deployment workflows: $EXPECTED_DEPLOYMENT_WORKFLOWS"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Expected workflow counts for replay testing
# Generated by generate_history.sh on Thu 31 Jul 2025 15:21:59 EDT
EXPECTED_DEPLOYMENT_WORKFLOWS=12
EXPECTED_VERSION_WORKFLOWS=14
ACTUAL_DEPLOYMENT_WORKFLOWS=12
ACTUAL_VERSION_WORKFLOWS=14
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
31 changes: 31 additions & 0 deletions service/worker/workerdeployment/replaytester/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,37 @@ func main() {
}
verifyDeployment(dHandle, "__unversioned__", "", client.WorkerDeploymentVersionDrainageStatusDraining)

// Simulating a scenario when a drained version is reactivated and then re-deactivated.

// Waiting for the version 1.0 to become drained.
time.Sleep(8 * time.Second)
// Make sure 1.0 is drained.
verifyDeployment(dHandle, "__unversioned__", "", client.WorkerDeploymentVersionDrainageStatusDrained)

// Rollback a drained version 1.0, so that it is the current version for this deployment
_, err = dHandle.SetCurrentVersion(context.Background(), client.WorkerDeploymentSetCurrentVersionOptions{
Version: deploymentName + ".1.0",
IgnoreMissingTaskQueues: true,
})
if err != nil {
log.Fatalln("Unable to set current version", err)
}

// Set current version to "__unversioned__" again so that version 1.0 can start draining. This replicates the
// scenario where a rolled back version is now draining.
_, err = dHandle.SetCurrentVersion(context.Background(), client.WorkerDeploymentSetCurrentVersionOptions{
Version: "__unversioned__",
IgnoreMissingTaskQueues: true,
})
if err != nil {
log.Fatalln("Unable to set current version", err)
}

// Waiting for the version 1.0 to become drained.
time.Sleep(8 * time.Second)
// Make sure 1.0 is drained.
verifyDeployment(dHandle, "__unversioned__", "", client.WorkerDeploymentVersionDrainageStatusDrained)

// Stopping both workers
w1.Stop()
w2.Stop()
Expand Down
13 changes: 11 additions & 2 deletions service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (d *VersionWorkflowRunner) handleUpdateVersionMetadata(ctx workflow.Context
}, nil
}

func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context, isCan bool) {
func (d *VersionWorkflowRunner) startDrainage(ctx workflow.Context) {
if d.VersionState.GetDrainageInfo().GetStatus() == enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED {
now := timestamppb.New(workflow.Now(ctx))
d.VersionState.DrainageInfo = &deploymentpb.VersionDrainageInfo{
Expand Down Expand Up @@ -543,7 +543,7 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
if newStatus == enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING {
// Version deactivated from current/ramping
d.VersionState.LastDeactivationTime = args.RoutingUpdateTime
d.startDrainage(ctx, false)
d.startDrainage(ctx)
}

// started accepting new workflows
Expand All @@ -552,6 +552,15 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
// First time this version is activated to current/ramping
d.VersionState.FirstActivationTime = args.RoutingUpdateTime
}

// Clear drainage information, if present, when a version gets activated.
// This handles the rollback scenario where a previously draining/drained version
// is reactivated and should have it's drainage information cleared.

v := workflow.GetVersion(ctx, "clear-drainage-on-activation", workflow.DefaultVersion, 0)
if v != workflow.DefaultVersion {
d.VersionState.DrainageInfo = nil
}
}

// status of the version is updated after a successful sync to all its task-queues
Expand Down
Loading
Loading