Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
int64, // ramping revision number
time.Time, // ramping update time
) {
var isTaskQueuePartOfSomeCurrentVersion bool
var isTaskQueuePartOfSomeRampingVersion bool

if deployments == nil {
return nil, 0, time.Time{}, nil, false, 0, 0, time.Time{}
}
Expand All @@ -680,6 +683,7 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
Version: DeploymentVersionFromDeployment(d.Deployment),
RoutingUpdateTime: d.Data.LastBecameCurrentTime,
}
isTaskQueuePartOfSomeCurrentVersion = true
}
}
}
Expand All @@ -690,11 +694,13 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
if v.RoutingUpdateTime != nil && v.GetCurrentSinceTime() != nil {
if t := v.RoutingUpdateTime.AsTime(); t.After(current.GetRoutingUpdateTime().AsTime()) {
current = v
isTaskQueuePartOfSomeCurrentVersion = true
}
}
if v.RoutingUpdateTime != nil && v.GetRampingSinceTime() != nil {
if t := v.RoutingUpdateTime.AsTime(); t.After(ramping.GetRoutingUpdateTime().AsTime()) {
ramping = v
isTaskQueuePartOfSomeRampingVersion = true
}
}
}
Expand All @@ -704,9 +710,6 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
var routingConfigLatestCurrentVersion *deploymentpb.RoutingConfig
var routingConfigLatestRampingVersion *deploymentpb.RoutingConfig

isPartOfSomeCurrentVersion := false
isPartOfSomeRampingVersion := false

if deployments.GetDeploymentsData() != nil {

for _, deploymentInfo := range deployments.GetDeploymentsData() {
Expand All @@ -725,33 +728,27 @@ func CalculateTaskQueueVersioningInfo(deployments *persistencespb.DeploymentData
if t := routingConfig.GetCurrentVersionChangedTime().AsTime(); t.After(routingConfigLatestCurrentVersion.GetCurrentVersionChangedTime().AsTime()) {
if HasDeploymentVersion(deployments, DeploymentVersionFromDeployment(DeploymentFromExternalDeploymentVersion(routingConfig.GetCurrentDeploymentVersion()))) {
routingConfigLatestCurrentVersion = routingConfig
isPartOfSomeCurrentVersion = true
} else if !isPartOfSomeCurrentVersion && routingConfig.GetCurrentDeploymentVersion() == nil {
isTaskQueuePartOfSomeCurrentVersion = true
} else if !isTaskQueuePartOfSomeCurrentVersion && routingConfig.GetCurrentDeploymentVersion() == nil {
// Current version can only be unversioned if the task queue is not part of any other version.
// The flag isTaskQueuePartOfSomeCurrentVersion serves the purpose of this check.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unversioned Deployments Blocked by Versioning Mixup

The refactored logic incorrectly blocks unversioned deployments from the new routing config format when the old format contains any current version (including unversioned ones). The removed validation specifically checked if current.GetVersion() != nil to only block when old format had a versioned deployment, but the new flag isTaskQueuePartOfSomeCurrentVersion is set even for unversioned deployments in the old format. This causes the function to ignore newer unversioned routing configs when an older unversioned deployment exists in the old format, violating the timestamp-based selection logic.

Fix in Cursor Fix in Web

routingConfigLatestCurrentVersion = routingConfig
}
}

if t := routingConfig.GetRampingVersionPercentageChangedTime().AsTime(); t.After(routingConfigLatestRampingVersion.GetRampingVersionPercentageChangedTime().AsTime()) {
if HasDeploymentVersion(deployments, DeploymentVersionFromDeployment(DeploymentFromExternalDeploymentVersion(routingConfig.GetRampingDeploymentVersion()))) {
routingConfigLatestRampingVersion = routingConfig
isPartOfSomeRampingVersion = true
} else if !isPartOfSomeRampingVersion && routingConfig.GetRampingDeploymentVersion() == nil {
isTaskQueuePartOfSomeRampingVersion = true
} else if !isTaskQueuePartOfSomeRampingVersion && routingConfig.GetRampingDeploymentVersion() == nil {
// Ramping version can only be unversioned if the task queue is not part of any other version that is ramping up.
// The flag isTaskQueuePartOfSomeRampingVersion serves the purpose of this check.
routingConfigLatestRampingVersion = routingConfig
}
}
}
}

if routingConfigLatestCurrentVersion.GetCurrentDeploymentVersion() == nil && current.GetVersion() != nil {
// The new current version is not unversioned but belongs to a versioned deployment which synced to the task-queue using the old deployment data format.
routingConfigLatestCurrentVersion = nil
}

if routingConfigLatestRampingVersion.GetRampingDeploymentVersion() == nil && ramping.GetVersion() != nil {
// The new ramping version is not unversioned but belongs to a versioned deployment which synced to the task-queue using the old deployment data format.
routingConfigLatestRampingVersion = nil
}

// Pick the final current and ramping version amongst the old and new deployment data formats.
return PickFinalCurrentAndRamping(
current,
Expand Down
Loading