Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.52.1-0.20250827195718-be47585d2457
go.temporal.io/api v1.52.1-0.20250904205541-d426aad53c4d
go.temporal.io/sdk v1.35.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/fx v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.52.1-0.20250827195718-be47585d2457 h1:Zj4wdx4kNN/9TeFxWNiWaqdMOVExh4C3KBu0WcDnI4s=
go.temporal.io/api v1.52.1-0.20250827195718-be47585d2457/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.52.1-0.20250904205541-d426aad53c4d h1:bBX8TbNXEVAS50OhN29yRydfVEr83AQ+vQDjKQkB2Fk=
go.temporal.io/api v1.52.1-0.20250904205541-d426aad53c4d/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ temporal operator namespace create default

# Run the worker which shall start the deployment entity workflows....
echo "Running the Go program..."
go run "$(dirname "$0")/worker/worker.go"

if ! go run "$(dirname "$0")/worker/worker.go"; then
echo "Go program exited with an error. Exiting bash script." >&2
exit 1
fi

echo "Go program completed successfully."

echo "Waiting 5 seconds for all workflows to show up in visibility..."
sleep 5
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Expected workflow counts for replay testing
# Generated by generate_history.sh on Thu Sep 4 13:46:54 PDT 2025
EXPECTED_DEPLOYMENT_WORKFLOWS=12
EXPECTED_VERSION_WORKFLOWS=14
ACTUAL_DEPLOYMENT_WORKFLOWS=12
ACTUAL_VERSION_WORKFLOWS=14
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,15 @@ func main() {
}
verifyDeployment(dHandle, "", build1, 1, client.WorkerDeploymentVersionDrainageStatusUnspecified)

// Set the ramp percent to 0
// TODO(carlydf): Once we allow it, test setting ramping version to nil while current version is also nil
// Unset the ramping version
_, err = dHandle.SetRampingVersion(context.Background(), client.WorkerDeploymentSetRampingVersionOptions{
BuildID: build1,
BuildID: "",
Percentage: 0,
})
if err != nil {
log.Fatalln("Unable to set ramping version to zero", err)
}
verifyDeployment(dHandle, "", build1, 0, client.WorkerDeploymentVersionDrainageStatusUnspecified)
verifyDeployment(dHandle, "", "", 0, client.WorkerDeploymentVersionDrainageStatusDraining)

// Set current version to 1.0
_, err = dHandle.SetCurrentVersion(context.Background(), client.WorkerDeploymentSetCurrentVersionOptions{
Expand Down
30 changes: 20 additions & 10 deletions service/worker/workerdeployment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ func (d *WorkflowRunner) handleDeleteDeployment(ctx workflow.Context) error {
return nil
}

func (d *WorkflowRunner) rampingVersionStringUnversioned(s string) bool {
return s == worker_versioning.UnversionedVersionId || s == ""
}

func (d *WorkflowRunner) validateStateBeforeAcceptingRampingUpdate(args *deploymentspb.SetRampingVersionArgs) error {
//nolint:staticcheck // SA1019: worker versioning v0.31
if args.Version == d.State.GetRoutingConfig().GetRampingVersion() &&
Expand All @@ -409,7 +413,8 @@ func (d *WorkflowRunner) validateStateBeforeAcceptingRampingUpdate(args *deploym
return temporal.NewApplicationError("conflict token mismatch", errFailedPrecondition)
}
//nolint:staticcheck // SA1019: worker versioning v0.31
if args.Version == d.State.GetRoutingConfig().GetCurrentVersion() {
if args.Version == d.State.GetRoutingConfig().GetCurrentVersion() &&
!(args.Version == worker_versioning.UnversionedVersionId && args.Percentage == 0) {
d.logger.Info("version can't be set to ramping since it is already current")
return temporal.NewApplicationError(fmt.Sprintf("requested ramping version %s is already current", args.Version), errFailedPrecondition)
}
Expand Down Expand Up @@ -492,8 +497,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
RampPercentage: 0, // remove ramp
}

// TODO (Shivam): remove the empty string check once canary stops flaking out
if prevRampingVersion != worker_versioning.UnversionedVersionId && prevRampingVersion != "" {
if !d.rampingVersionStringUnversioned(prevRampingVersion) {
if _, err := d.syncVersion(ctx, prevRampingVersion, unsetRampUpdateArgs, false); err != nil {
return nil, err
}
Expand Down Expand Up @@ -542,8 +546,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
RampingSinceTime: rampingSinceTime,
RampPercentage: args.Percentage,
}
// TODO (Shivam): remove the empty string check once canary stops flaking out
if newRampingVersion != worker_versioning.UnversionedVersionId && newRampingVersion != "" {
if !d.rampingVersionStringUnversioned(newRampingVersion) {
if _, err := d.syncVersion(ctx, newRampingVersion, setRampUpdateArgs, true); err != nil {
return nil, err
}
Expand All @@ -560,8 +563,7 @@ func (d *WorkflowRunner) handleSetRampingVersion(ctx workflow.Context, args *dep
RampingSinceTime: nil, // remove ramp
RampPercentage: 0, // remove ramp
}
// TODO (Shivam): remove the empty string check once canary stops flaking out
if prevRampingVersion != worker_versioning.UnversionedVersionId && prevRampingVersion != "" {
if !d.rampingVersionStringUnversioned(prevRampingVersion) {
if _, err := d.syncVersion(ctx, prevRampingVersion, unsetRampUpdateArgs, false); err != nil {
return nil, err
}
Expand Down Expand Up @@ -954,14 +956,22 @@ func (d *WorkflowRunner) syncVersion(ctx workflow.Context, targetVersion string,
func (d *WorkflowRunner) syncUnversionedRamp(ctx workflow.Context, versionUpdateArgs *deploymentspb.SyncVersionStateUpdateArgs) error {
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)

// DescribeVersion activity to get all the task queues in the current version
// DescribeVersion activity to get all the task queues in the current version, or the ramping version if current is nil
version := d.State.RoutingConfig.CurrentVersion //nolint:staticcheck // SA1019: worker versioning v0.31
if version == worker_versioning.UnversionedVersionId {
version = d.State.RoutingConfig.RampingVersion //nolint:staticcheck // SA1019: worker versioning v0.31
}

if d.rampingVersionStringUnversioned(version) {
return nil
}

var res deploymentspb.DescribeVersionFromWorkerDeploymentActivityResult
err := workflow.ExecuteActivity(
activityCtx,
d.a.DescribeVersionFromWorkerDeployment,
&deploymentspb.DescribeVersionFromWorkerDeploymentActivityArgs{
Version: d.State.RoutingConfig.CurrentVersion, //nolint:staticcheck // SA1019: worker versioning v0.31

Version: version,
}).Get(ctx, &res)
if err != nil {
return err
Expand Down
27 changes: 25 additions & 2 deletions tests/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,29 @@ func (s *WorkerDeploymentSuite) TestSetWorkerDeploymentRampingVersion_Invalid_Se
})
}

func (s *WorkerDeploymentSuite) TestSetWorkerDeploymentRampingVersion_Valid_SetNilCurrent_To_Ramping() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

tv := testvars.New(s).WithBuildIDNumber(1)
s.startVersionWorkflow(ctx, tv)

// set ramping version to unversioned will change the modifier identity, so it's not a no-op
s.setAndVerifyRampingVersion(ctx, tv, true, 0, false, "", &workflowservice.SetWorkerDeploymentRampingVersionResponse{})

// set a non-nil ramping version so that we can unset it in the next step
s.setAndVerifyRampingVersion(ctx, tv, false, 5, false, "", &workflowservice.SetWorkerDeploymentRampingVersionResponse{
PreviousVersion: worker_versioning.UnversionedVersionId,
})

// should be able to unset ramping version while current version is nil with no error
s.setAndVerifyRampingVersion(ctx, tv, true, 0, true, "", &workflowservice.SetWorkerDeploymentRampingVersionResponse{
PreviousVersion: tv.DeploymentVersionString(), //nolint:staticcheck // SA1019: worker versioning v0.31
PreviousDeploymentVersion: tv.ExternalDeploymentVersion(),
PreviousPercentage: 5,
})
}

func (s *WorkerDeploymentSuite) TestSetWorkerDeploymentRampingVersion_ModifyExistingRampVersionPercentage() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down Expand Up @@ -1140,7 +1163,7 @@ func (s *WorkerDeploymentSuite) TestSetWorkerDeploymentRampingVersion_WithCurren
Name: tv.DeploymentSeries(),
CreateTime: versionCreateTime,
RoutingConfig: &deploymentpb.RoutingConfig{
RampingVersion: "",
RampingVersion: worker_versioning.UnversionedVersionId, //nolint:staticcheck // SA1019: worker versioning v0.31
RampingVersionPercentage: 0,
RampingVersionChangedTime: unsetRampingUpdateTime,
RampingVersionPercentageChangedTime: unsetRampingUpdateTime,
Expand Down Expand Up @@ -2970,7 +2993,7 @@ func (s *WorkerDeploymentSuite) setAndVerifyRampingVersionUnversionedOption(
version = worker_versioning.UnversionedVersionId
}
if unset {
version = ""
version = worker_versioning.UnversionedVersionId
percentage = 0
}
if !allowNoPollers && ensureSystemWorkflowsExist {
Expand Down
Loading