Skip to content

Commit 77f11f3

Browse files
committed
new producer-centric decision making
1 parent 9ce98f4 commit 77f11f3

File tree

16 files changed

+584
-597
lines changed

16 files changed

+584
-597
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 467 additions & 454 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/workflow/v1/message.pb.go

Lines changed: 10 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/worker_versioning/worker_versioning.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,12 @@ func MakeDirectiveForWorkflowTask(
240240
return nil
241241
}
242242

243+
func IsTaskQueueInVersion(tq string, version *deploymentpb.WorkerDeploymentVersion) bool {
244+
// TODO(carlydf): implement this, which might require more input parameters
245+
// if there is an error talking to matching, I think we should just not return false because we can't confirm
246+
return false
247+
}
248+
243249
// DeploymentVersionFromDeployment Temporary helper function to convert Deployment to
244250
// WorkerDeploymentVersion proto until we update code to use the new proto in all places.
245251
func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentspb.WorkerDeploymentVersion {
@@ -375,6 +381,18 @@ func OverrideIsPinned(override *workflowpb.VersioningOverride) bool {
375381
override.GetPinned().GetBehavior() == workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED
376382
}
377383

384+
func GetOverridePinnedVersion(override *workflowpb.VersioningOverride) *deploymentpb.WorkerDeploymentVersion {
385+
if OverrideIsPinned(override) {
386+
if v := override.GetPinned().GetVersion(); v != nil {
387+
return v
388+
} else if v := override.GetPinnedVersion(); v != "" { //nolint:staticcheck // SA1019: worker versioning v0.31
389+
return ExternalWorkerDeploymentVersionFromString(v)
390+
}
391+
return ExternalWorkerDeploymentVersionFromDeployment(override.GetDeployment()) //nolint:staticcheck // SA1019: worker versioning v0.30
392+
}
393+
return nil
394+
}
395+
378396
func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
379397
if override == nil {
380398
return nil

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ require (
5959
go.opentelemetry.io/otel/sdk v1.34.0
6060
go.opentelemetry.io/otel/sdk/metric v1.34.0
6161
go.opentelemetry.io/otel/trace v1.34.0
62-
go.temporal.io/api v1.49.2-0.20250528190503-671bdd75fbcb
62+
go.temporal.io/api v1.49.2-0.20250528205930-44769cadc279
6363
go.temporal.io/sdk v1.34.0
6464
go.temporal.io/version v0.3.0
6565
go.uber.org/automaxprocs v1.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
385385
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
386386
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
387387
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
388-
go.temporal.io/api v1.49.2-0.20250528190503-671bdd75fbcb h1:CHH6Xs1yaK3dyUQh4Ix7SgKtcOhu6d5NmxGvgVQlTfw=
389-
go.temporal.io/api v1.49.2-0.20250528190503-671bdd75fbcb/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
388+
go.temporal.io/api v1.49.2-0.20250528205930-44769cadc279 h1:1v5WEMcukv+vTKQ/gja1NQZTyy2EwBUfZDx7V+l+To8=
389+
go.temporal.io/api v1.49.2-0.20250528205930-44769cadc279/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
390390
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
391391
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
392392
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ message StartWorkflowExecutionRequest {
8989
temporal.api.workflow.v1.VersioningOverride versioning_override = 13;
9090
// If set, we verify the parent-child relationship before applying ID conflict policy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
9191
bool child_workflow_only = 14;
92+
// If present, the new workflow should start on this version with pinned base behavior.
93+
temporal.api.deployment.v1.WorkerDeploymentVersion inherited_pinned_version = 15;
9294
}
9395

9496
message StartWorkflowExecutionResponse {

proto/internal/temporal/server/api/workflow/v1/message.proto

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package temporal.server.api.workflow.v1;
55
option go_package = "go.temporal.io/server/api/workflow/v1;workflow";
66

77
import "temporal/api/common/v1/message.proto";
8-
import "temporal/api/history/v1/message.proto";
98

109
import "temporal/server/api/clock/v1/message.proto";
1110

@@ -23,9 +22,6 @@ message ParentExecutionInfo {
2322
// Not set in the subsequent execution if the child workflow continues-as-new.
2423
// Deprecated. Replaced with `parent_versioning_info` in WorkflowExecutionStartedEventAttributes.
2524
string pinned_worker_deployment_version = 7;
26-
27-
// Present if parent is versioned. Consumer decides whether to inherit.
28-
temporal.api.history.v1.WorkflowExecutionStartedEventAttributes.SourceWorkflowVersioningInfo versioning_info = 9;
2925
}
3026

3127
message RootExecutionInfo {

service/history/api/startworkflow/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (s *Starter) lockCurrentWorkflowExecution(
235235
// It returns the creationContext which can later be used to insert into the executions table.
236236
func (s *Starter) prepareNewWorkflow(workflowID string) (*creationParams, error) {
237237
runID := primitives.NewUUID().String()
238-
mutableState, err := api.NewWorkflowWithSignal( // findme carly
238+
mutableState, err := api.NewWorkflowWithSignal(
239239
s.shardContext,
240240
s.namespace,
241241
workflowID,

service/history/historybuilder/event_factory.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func (b *EventFactory) CreateWorkflowExecutionStartedEvent(
3434
prevRunID string,
3535
firstRunID string,
3636
originalRunID string,
37-
previousRunVersioningInfo *historypb.WorkflowExecutionStartedEventAttributes_SourceWorkflowVersioningInfo,
3837
) *historypb.HistoryEvent {
3938
event := b.createHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, startTime)
4039
req := request.StartRequest
@@ -68,7 +67,7 @@ func (b *EventFactory) CreateWorkflowExecutionStartedEvent(
6867
InheritedBuildId: request.InheritedBuildId,
6968
VersioningOverride: worker_versioning.ConvertOverrideToV32(request.VersioningOverride),
7069
Priority: req.GetPriority(),
71-
PreviousRunVersioningInfo: previousRunVersioningInfo,
70+
InheritedPinnedVersion: request.InheritedPinnedVersion,
7271
}
7372

7473
parentInfo := request.ParentExecutionInfo
@@ -78,7 +77,6 @@ func (b *EventFactory) CreateWorkflowExecutionStartedEvent(
7877
attributes.ParentWorkflowExecution = parentInfo.Execution
7978
attributes.ParentInitiatedEventId = parentInfo.InitiatedId
8079
attributes.ParentInitiatedEventVersion = parentInfo.InitiatedVersion
81-
attributes.ParentVersioningInfo = parentInfo.VersioningInfo
8280
}
8381

8482
event.Attributes = &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{

service/history/historybuilder/history_builder.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionStartedEvent(
158158
prevRunID string,
159159
firstInChainRunID string,
160160
originalRunID string,
161-
previousRunVersioningInfo *historypb.WorkflowExecutionStartedEventAttributes_SourceWorkflowVersioningInfo,
162161
) *historypb.HistoryEvent {
163162
event := b.EventFactory.CreateWorkflowExecutionStartedEvent(
164163
startTime,
@@ -167,7 +166,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionStartedEvent(
167166
prevRunID,
168167
firstInChainRunID,
169168
originalRunID,
170-
previousRunVersioningInfo,
171169
)
172170
if request.StartRequest.GetUserMetadata() != nil {
173171
event.UserMetadata = request.StartRequest.GetUserMetadata()

0 commit comments

Comments
 (0)