Skip to content

Commit fe5a91c

Browse files
authored
Pin workflows across ContinueAsNew and implement Cross-TQ inheritance (#7776)
## What changed? - Updated worker versioning inheritance logic for workflow chains (parent-child and continue-as-new) - Replaced deprecated pinned_deployment_version with new inherited_pinned_version field in workflow execution - Handled versioning inheritance for retry and cron workflows - Allow cross-task-queue CaN and Child pinned version inheritance if new task queue is in the inherited version ## Why? - Improves versioning behavior consistency across workflow chains by properly inheriting versioning info from parent/previous runs - Removes deprecated fields and aligns with new versioning API design - Fixes edge cases in versioning inheritance for retry and cron workflows Basically, we want `PINNED` behavior to carry through parent child chains, and across the CaN boundary, so that we can roll out a new behavior for Trampolining in the future, which will not be inherited across the CaN boundary. ## How did you test it? - [x] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s) ## Potential risks Could break history :P
1 parent f6d6e5d commit fe5a91c

26 files changed

+781
-618
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 468 additions & 454 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/workflow/v1/message.pb.go

Lines changed: 16 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/worker_versioning/worker_versioning.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ import (
1515
"go.temporal.io/api/serviceerror"
1616
workflowpb "go.temporal.io/api/workflow/v1"
1717
deploymentspb "go.temporal.io/server/api/deployment/v1"
18+
"go.temporal.io/server/api/matchingservice/v1"
1819
persistencespb "go.temporal.io/server/api/persistence/v1"
1920
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
2021
"go.temporal.io/server/common/namespace"
2122
"go.temporal.io/server/common/persistence/visibility/manager"
23+
"go.temporal.io/server/common/resource"
2224
"go.temporal.io/server/common/searchattribute"
2325
serviceerrors "go.temporal.io/server/common/serviceerror"
26+
"google.golang.org/protobuf/proto"
2427
"google.golang.org/protobuf/types/known/emptypb"
2528
)
2629

@@ -257,6 +260,64 @@ func MakeDirectiveForWorkflowTask(
257260
return nil
258261
}
259262

263+
type IsWFTaskQueueInVersionDetector = func(ctx context.Context, namespaceID, tq string, version *deploymentpb.WorkerDeploymentVersion) (bool, error)
264+
265+
func GetIsWFTaskQueueInVersionDetector(matchingClient resource.MatchingClient) IsWFTaskQueueInVersionDetector {
266+
return func(ctx context.Context,
267+
namespaceID, tq string,
268+
version *deploymentpb.WorkerDeploymentVersion) (bool, error) {
269+
resp, err := matchingClient.GetTaskQueueUserData(ctx,
270+
&matchingservice.GetTaskQueueUserDataRequest{
271+
NamespaceId: namespaceID,
272+
TaskQueue: tq,
273+
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
274+
})
275+
if err != nil {
276+
return false, err
277+
}
278+
tqData, ok := resp.GetUserData().GetData().GetPerType()[int32(enumspb.TASK_QUEUE_TYPE_WORKFLOW)]
279+
if !ok {
280+
// The TQ is unversioned
281+
return false, nil
282+
}
283+
return HasDeploymentVersion(tqData.GetDeploymentData(), DeploymentVersionFromDeployment(DeploymentFromExternalDeploymentVersion(version))), nil
284+
}
285+
}
286+
287+
// [cleanup-wv-pre-release]
288+
func FindDeployment(deployments *persistencespb.DeploymentData, deployment *deploymentpb.Deployment) int {
289+
for i, d := range deployments.GetDeployments() { //nolint:staticcheck // SA1019: worker versioning v0.30
290+
if d.Deployment.Equal(deployment) {
291+
return i
292+
}
293+
}
294+
return -1
295+
}
296+
297+
func FindDeploymentVersion(deployments *persistencespb.DeploymentData, v *deploymentspb.WorkerDeploymentVersion) int {
298+
for i, vd := range deployments.GetVersions() {
299+
if proto.Equal(v, vd.GetVersion()) {
300+
return i
301+
}
302+
}
303+
return -1
304+
}
305+
306+
//nolint:staticcheck
307+
func HasDeploymentVersion(deployments *persistencespb.DeploymentData, v *deploymentspb.WorkerDeploymentVersion) bool {
308+
for _, d := range deployments.GetDeployments() {
309+
if d.Deployment.Equal(DeploymentFromDeploymentVersion(v)) {
310+
return true
311+
}
312+
}
313+
for _, vd := range deployments.GetVersions() {
314+
if proto.Equal(v, vd.GetVersion()) {
315+
return true
316+
}
317+
}
318+
return false
319+
}
320+
260321
// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
261322
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
262323
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentspb.WorkerDeploymentVersion {
@@ -397,6 +458,17 @@ func OverrideIsPinned(override *workflowpb.VersioningOverride) bool {
397458
override.GetPinned().GetBehavior() == workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED
398459
}
399460

461+
func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
462+
if OverrideIsPinned(override) {
463+
if v := override.GetPinned().GetVersion(); v != nil {
464+
return v
465+
} else if v := override.GetPinnedVersion(); v != "" { //nolint:staticcheck // SA1019: worker versioning v0.31
466+
return ExternalWorkerDeploymentVersionFromStringV31(v)
467+
}
468+
return ExternalWorkerDeploymentVersionFromDeployment(override.GetDeployment()) //nolint:staticcheck // SA1019: worker versioning v0.30
469+
}
470+
return nil
471+
}
400472
func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior {
401473
if override.GetAutoUpgrade() {
402474
return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ require (
5959
go.opentelemetry.io/otel/sdk v1.34.0
6060
go.opentelemetry.io/otel/sdk/metric v1.34.0
6161
go.opentelemetry.io/otel/trace v1.34.0
62-
go.temporal.io/api v1.49.2-0.20250528164413-b08ef1e10ee0
62+
go.temporal.io/api v1.49.2-0.20250530003406-6ac9b437f9db
6363
go.temporal.io/sdk v1.34.0
6464
go.temporal.io/version v0.3.0
6565
go.uber.org/automaxprocs v1.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
385385
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
386386
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
387387
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
388-
go.temporal.io/api v1.49.2-0.20250528164413-b08ef1e10ee0 h1:MNgNcZbq0Zu8XtFCMgNY2jzGVweA4JjoaR81xUbD6DE=
389-
go.temporal.io/api v1.49.2-0.20250528164413-b08ef1e10ee0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
388+
go.temporal.io/api v1.49.2-0.20250530003406-6ac9b437f9db h1:RAkllf0vM3U0UBHBjq9iwi4JD36ATl/5gmQciZsrvho=
389+
go.temporal.io/api v1.49.2-0.20250530003406-6ac9b437f9db/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
390390
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
391391
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
392392
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,15 @@ message StartWorkflowExecutionRequest {
8282
// For top-level workflows (ie., without parent), this field must be nil.
8383
temporal.server.api.workflow.v1.RootExecutionInfo root_execution_info = 11;
8484
// inherited build ID from parent/previous execution
85+
// Deprecated. Use behavior, version, and task queue fields in `parent_execution_info`.
8586
string inherited_build_id = 12;
8687
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
8788
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
8889
temporal.api.workflow.v1.VersioningOverride versioning_override = 13;
8990
// If set, we verify the parent-child relationship before applying ID conflict policy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
9091
bool child_workflow_only = 14;
92+
// If present, the new workflow should start on this version with pinned base behavior.
93+
temporal.api.deployment.v1.WorkerDeploymentVersion inherited_pinned_version = 15;
9194
}
9295

9396
message StartWorkflowExecutionResponse {

proto/internal/temporal/server/api/workflow/v1/message.proto

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package temporal.server.api.workflow.v1;
55
option go_package = "go.temporal.io/server/api/workflow/v1;workflow";
66

77
import "temporal/api/common/v1/message.proto";
8-
import "temporal/api/deployment/v1/message.proto";
98

109
import "temporal/server/api/clock/v1/message.proto";
1110

@@ -21,14 +20,9 @@ message ParentExecutionInfo {
2120
// first starts the child workflow, and the child workflow is starting on a Task Queue belonging
2221
// to the same Worker Deployment Version.
2322
// Not set in the subsequent execution if the child workflow continues-as-new.
24-
// Deprecated. Replaced with `pinned_deployment_version`.
23+
// Deprecated. Replaced with `inherited_pinned_version` in WorkflowExecutionStartedEventAttributes.
2524
string pinned_worker_deployment_version = 7;
26-
// When present, child workflow starts as Pinned to this Worker Deployment Version.
27-
// Set only if the parent execution is effectively Pinned to a Worker Deployment Version when it
28-
// first starts the child workflow, and the child workflow is starting on a Task Queue belonging
29-
// to the same Worker Deployment Version.
30-
// Not set in the subsequent execution if the child workflow continues-as-new.
31-
temporal.api.deployment.v1.WorkerDeploymentVersion pinned_deployment_version = 8;
25+
reserved 8;
3226
}
3327

3428
message RootExecutionInfo {

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
searchAttributesValidator *searchattribute.Validator
5959
persistenceVisibilityMgr manager.VisibilityManager
6060
commandHandlerRegistry *workflow.CommandHandlerRegistry
61+
matchingClient matchingservice.MatchingServiceClient
6162
}
6263
)
6364

@@ -69,6 +70,7 @@ func NewWorkflowTaskCompletedHandler(
6970
searchAttributesValidator *searchattribute.Validator,
7071
visibilityManager manager.VisibilityManager,
7172
workflowConsistencyChecker api.WorkflowConsistencyChecker,
73+
matchingClient matchingservice.MatchingServiceClient,
7274
) *WorkflowTaskCompletedHandler {
7375
return &WorkflowTaskCompletedHandler{
7476
config: shardContext.GetConfig(),
@@ -90,6 +92,7 @@ func NewWorkflowTaskCompletedHandler(
9092
searchAttributesValidator: searchAttributesValidator,
9193
persistenceVisibilityMgr: visibilityManager,
9294
commandHandlerRegistry: commandHandlerRegistry,
95+
matchingClient: matchingClient,
9396
}
9497
}
9598

@@ -375,6 +378,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
375378
handler.searchAttributesMapperProvider,
376379
hasBufferedEventsOrMessages,
377380
handler.commandHandlerRegistry,
381+
handler.matchingClient,
378382
)
379383

380384
if responseMutations, err = workflowTaskHandler.handleCommands(

service/history/api/respondworkflowtaskcompleted/api_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ func (s *WorkflowTaskCompletedHandlerSuite) SetupSubTest() {
125125
nil,
126126
nil,
127127
nil,
128-
api.NewWorkflowConsistencyChecker(s.mockShard, s.workflowCache))
128+
api.NewWorkflowConsistencyChecker(s.mockShard, s.workflowCache),
129+
nil)
129130
}
130131

131132
func (s *WorkflowTaskCompletedHandlerSuite) TearDownTest() {

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.temporal.io/api/serviceerror"
1717
"go.temporal.io/api/workflowservice/v1"
1818
"go.temporal.io/server/api/historyservice/v1"
19+
"go.temporal.io/server/api/matchingservice/v1"
1920
"go.temporal.io/server/common"
2021
"go.temporal.io/server/common/backoff"
2122
"go.temporal.io/server/common/collection"
@@ -31,6 +32,7 @@ import (
3132
"go.temporal.io/server/common/protocol"
3233
"go.temporal.io/server/common/searchattribute"
3334
"go.temporal.io/server/common/tasktoken"
35+
"go.temporal.io/server/common/worker_versioning"
3436
"go.temporal.io/server/service/history/api"
3537
"go.temporal.io/server/service/history/configs"
3638
historyi "go.temporal.io/server/service/history/interfaces"
@@ -73,6 +75,7 @@ type (
7375
shard historyi.ShardContext
7476
tokenSerializer *tasktoken.Serializer
7577
commandHandlerRegistry *workflow.CommandHandlerRegistry
78+
matchingClient matchingservice.MatchingServiceClient
7679
}
7780

7881
workflowTaskFailedCause struct {
@@ -111,6 +114,7 @@ func newWorkflowTaskCompletedHandler(
111114
searchAttributesMapperProvider searchattribute.MapperProvider,
112115
hasBufferedEventsOrMessages bool,
113116
commandHandlerRegistry *workflow.CommandHandlerRegistry,
117+
matchingClient matchingservice.MatchingServiceClient,
114118
) *workflowTaskCompletedHandler {
115119
return &workflowTaskCompletedHandler{
116120
identity: identity,
@@ -142,6 +146,7 @@ func newWorkflowTaskCompletedHandler(
142146
shard: shard,
143147
tokenSerializer: tasktoken.NewSerializer(),
144148
commandHandlerRegistry: commandHandlerRegistry,
149+
matchingClient: matchingClient,
145150
}
146151
}
147152

@@ -927,7 +932,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow(
927932
}
928933

929934
if handler.mutableState.GetAssignedBuildId() == "" {
930-
// TODO: this is supported in new versioning [cleanup-old-wv]
935+
// TODO(carlydf): this is supported in new versioning [cleanup-old-wv]
931936
if attr.InheritBuildId && attr.TaskQueue.GetName() != "" && attr.TaskQueue.Name != handler.mutableState.GetExecutionInfo().TaskQueue {
932937
err := serviceerror.NewInvalidArgument("ContinueAsNew with UseCompatibleVersion cannot run on different task queue.")
933938
return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES, err)
@@ -986,6 +991,7 @@ func (handler *workflowTaskCompletedHandler) handleCommandContinueAsNewWorkflow(
986991
handler.workflowTaskCompletedID,
987992
parentNamespace,
988993
attr,
994+
worker_versioning.GetIsWFTaskQueueInVersionDetector(handler.matchingClient),
989995
)
990996
if err != nil {
991997
return nil, err

0 commit comments

Comments
 (0)