Skip to content

Commit ee13c32

Browse files
authored
Merge branch 'main' into ss/enable-versioning-032-configs
2 parents 2b3bd4f + c7ee12d commit ee13c32

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1102
-990
lines changed

api/deployment/v1/message.go-helpers.pb.go

Lines changed: 0 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/deployment/v1/message.pb.go

Lines changed: 377 additions & 444 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/worker_versioning/worker_versioning.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -533,16 +533,6 @@ func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deployme
533533
return directive.GetDeployment()
534534
}
535535

536-
// The worker deployment manager workflows still use the v0.31 format, so call this before returning the object to readers
537-
// to mutatively populate the missing fields.
538-
func AddV32RoutingConfigToV31(r *deploymentpb.RoutingConfig) *deploymentpb.RoutingConfig {
539-
//nolint:staticcheck // SA1019: worker versioning v0.31
540-
r.CurrentDeploymentVersion = ExternalWorkerDeploymentVersionFromString(r.CurrentVersion)
541-
//nolint:staticcheck // SA1019: worker versioning v0.31
542-
r.RampingDeploymentVersion = ExternalWorkerDeploymentVersionFromString(r.RampingVersion)
543-
return r
544-
}
545-
546536
// We store versioning info in the modern v0.32 format, so call this before returning the object to readers
547537
// to mutatively populate the missing fields.
548538
func AddV31VersioningInfoToV32(info *workflowpb.WorkflowExecutionVersioningInfo) *workflowpb.WorkflowExecutionVersioningInfo {

proto/internal/temporal/server/api/deployment/v1/message.proto

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,6 @@ message WorkerDeploymentWorkflowArgs {
122122
WorkerDeploymentLocalState state = 4;
123123
}
124124

125-
// used as drainage workflow input:
126-
message DrainageWorkflowArgs {
127-
string namespace_name = 1;
128-
WorkerDeploymentVersion version = 2;
129-
bool is_can = 3;
130-
}
131-
132125
// Local state for Worker Deployment
133126
message WorkerDeploymentLocalState {
134127
google.protobuf.Timestamp create_time = 1;

service/frontend/workflow_handler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3522,7 +3522,7 @@ func (wh *WorkflowHandler) ListWorkerDeployments(ctx context.Context, request *w
35223522
workerDeployments[i] = &workflowservice.ListWorkerDeploymentsResponse_WorkerDeploymentSummary{
35233523
Name: d.Name,
35243524
CreateTime: d.CreateTime,
3525-
RoutingConfig: worker_versioning.AddV32RoutingConfigToV31(d.RoutingConfig),
3525+
RoutingConfig: d.RoutingConfig,
35263526
}
35273527
}
35283528

@@ -3553,7 +3553,6 @@ func (wh *WorkflowHandler) DescribeWorkerDeployment(ctx context.Context, request
35533553
//nolint:staticcheck // SA1019: worker versioning v0.31
35543554
vs.DeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromString(vs.Version)
35553555
}
3556-
workerDeploymentInfo.RoutingConfig = worker_versioning.AddV32RoutingConfigToV31(workerDeploymentInfo.RoutingConfig)
35573556
return &workflowservice.DescribeWorkerDeploymentResponse{
35583557
WorkerDeploymentInfo: workerDeploymentInfo,
35593558
ConflictToken: cT,

service/history/ndc/workflow_resetter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ func reapplyEvents(
967967
enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED,
968968
enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT,
969969
enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
970-
if !isReset {
970+
if isDuplicate(event) {
971971
continue
972972
}
973973
err := reapplyChildEvents(mutableState, event)
@@ -1023,7 +1023,7 @@ func reapplyChildEvents(mutableState historyi.MutableState, event *historypb.His
10231023
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
10241024
childEventAttributes := event.GetChildWorkflowExecutionStartedEventAttributes()
10251025
ci, childExists := mutableState.GetChildExecutionInfo(childEventAttributes.GetInitiatedEventId())
1026-
if !childExists {
1026+
if !childExists || ci.StartedEventId != common.EmptyEventID {
10271027
return nil
10281028
}
10291029
childClock := ci.Clock

service/history/ndc/workflow_resetter_test.go

Lines changed: 71 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -895,46 +895,57 @@ func (s *workflowResetterSuite) TestReapplyEvents_WithPendingChildren() {
895895
{name: "apply child timeout event", events: []*historypb.HistoryEvent{childExecutionTimeoutEvent}},
896896
{name: "apply child terminated event", events: []*historypb.HistoryEvent{childExecutionTerminatedEvent}},
897897
}
898-
mutableState := historyi.NewMockMutableState(s.controller)
899-
mutableState.EXPECT().GetChildExecutionInfo(testInitiatedEventID).
900-
Times(len(testcases)). // GetChildExecutionInfo should be called exactly once for each test case.
901-
Return(&persistencespb.ChildExecutionInfo{Clock: testChildClock}, true)
902-
903-
// Each of the events must be picked with the correct args exactly once.
904-
mutableState.EXPECT().AddChildWorkflowExecutionStartedEvent(testChildWFExecution, testChildWFType, testInitiatedEventID, testStartEventHeader, testChildClock).Return(nil, nil).Times(1)
905-
mutableState.EXPECT().AddChildWorkflowExecutionCompletedEvent(
906-
testInitiatedEventID,
907-
testChildWFExecution,
908-
&historypb.WorkflowExecutionCompletedEventAttributes{Result: testCompletedEventResult},
909-
).Return(nil, nil).Times(1)
910-
mutableState.EXPECT().AddStartChildWorkflowExecutionFailedEvent(
911-
testInitiatedEventID,
912-
enumspb.START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_NAMESPACE_NOT_FOUND,
913-
&historypb.StartChildWorkflowExecutionInitiatedEventAttributes{WorkflowId: testChildWFExecution.WorkflowId},
914-
).Return(nil, nil).Times(1)
915-
mutableState.EXPECT().AddChildWorkflowExecutionFailedEvent(
916-
testInitiatedEventID,
917-
testChildWFExecution,
918-
&historypb.WorkflowExecutionFailedEventAttributes{Failure: &failurepb.Failure{}, RetryState: enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE},
919-
).Return(nil, nil).Times(1)
920-
mutableState.EXPECT().AddChildWorkflowExecutionCanceledEvent(
921-
testInitiatedEventID,
922-
testChildWFExecution,
923-
&historypb.WorkflowExecutionCanceledEventAttributes{Details: &commonpb.Payloads{}},
924-
).Return(nil, nil).Times(1)
925-
mutableState.EXPECT().AddChildWorkflowExecutionTimedOutEvent(
926-
testInitiatedEventID,
927-
testChildWFExecution,
928-
&historypb.WorkflowExecutionTimedOutEventAttributes{RetryState: enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE},
929-
).Return(nil, nil).Times(1)
930-
mutableState.EXPECT().AddChildWorkflowExecutionTerminatedEvent(
931-
testInitiatedEventID,
932-
testChildWFExecution,
933-
).Return(nil, nil).Times(1)
934-
935-
for _, tc := range testcases {
936-
_, err := reapplyEvents(context.Background(), mutableState, nil, nil, tc.events, nil, "", true)
937-
s.NoError(err)
898+
resetcases := []struct {
899+
name string
900+
isReset bool
901+
}{
902+
{name: "reset", isReset: true},
903+
{name: "no reset", isReset: false},
904+
}
905+
for _, tcReset := range resetcases {
906+
mutableState := historyi.NewMockMutableState(s.controller)
907+
mutableState.EXPECT().GetChildExecutionInfo(testInitiatedEventID).
908+
Times(len(testcases)). // GetChildExecutionInfo should be called exactly once for each test case.
909+
Return(&persistencespb.ChildExecutionInfo{Clock: testChildClock}, true)
910+
911+
// Each of the events must be picked with the correct args exactly once.
912+
mutableState.EXPECT().AddChildWorkflowExecutionStartedEvent(testChildWFExecution, testChildWFType, testInitiatedEventID, testStartEventHeader, testChildClock).Return(nil, nil).Times(1)
913+
mutableState.EXPECT().AddChildWorkflowExecutionCompletedEvent(
914+
testInitiatedEventID,
915+
testChildWFExecution,
916+
&historypb.WorkflowExecutionCompletedEventAttributes{Result: testCompletedEventResult},
917+
).Return(nil, nil).Times(1)
918+
mutableState.EXPECT().AddStartChildWorkflowExecutionFailedEvent(
919+
testInitiatedEventID,
920+
enumspb.START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_NAMESPACE_NOT_FOUND,
921+
&historypb.StartChildWorkflowExecutionInitiatedEventAttributes{WorkflowId: testChildWFExecution.WorkflowId},
922+
).Return(nil, nil).Times(1)
923+
mutableState.EXPECT().AddChildWorkflowExecutionFailedEvent(
924+
testInitiatedEventID,
925+
testChildWFExecution,
926+
&historypb.WorkflowExecutionFailedEventAttributes{Failure: &failurepb.Failure{}, RetryState: enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE},
927+
).Return(nil, nil).Times(1)
928+
mutableState.EXPECT().AddChildWorkflowExecutionCanceledEvent(
929+
testInitiatedEventID,
930+
testChildWFExecution,
931+
&historypb.WorkflowExecutionCanceledEventAttributes{Details: &commonpb.Payloads{}},
932+
).Return(nil, nil).Times(1)
933+
mutableState.EXPECT().AddChildWorkflowExecutionTimedOutEvent(
934+
testInitiatedEventID,
935+
testChildWFExecution,
936+
&historypb.WorkflowExecutionTimedOutEventAttributes{RetryState: enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE},
937+
).Return(nil, nil).Times(1)
938+
mutableState.EXPECT().AddChildWorkflowExecutionTerminatedEvent(
939+
testInitiatedEventID,
940+
testChildWFExecution,
941+
).Return(nil, nil).Times(1)
942+
943+
for _, tc := range testcases {
944+
s.Run(tc.name+" "+tcReset.name, func() {
945+
_, err := reapplyEvents(context.Background(), mutableState, nil, nil, tc.events, nil, "", tcReset.isReset)
946+
s.NoError(err)
947+
})
948+
}
938949
}
939950
}
940951

@@ -982,15 +993,26 @@ func (s *workflowResetterSuite) TestReapplyEvents_WithNoPendingChildren() {
982993
{name: "apply child timeout event", events: []*historypb.HistoryEvent{childExecutionTimeoutEvent}},
983994
{name: "apply child terminated event", events: []*historypb.HistoryEvent{childExecutionTerminatedEvent}},
984995
}
985-
mutableState := historyi.NewMockMutableState(s.controller)
986-
// GetChildExecutionInfo should be called exactly once for each test case and none of the Add event methods must be called.
987-
mutableState.EXPECT().GetChildExecutionInfo(testInitiatedEventID).
988-
Times(len(testCases)).
989-
Return(nil, false)
990-
991-
for _, tc := range testCases {
992-
_, err := reapplyEvents(context.Background(), mutableState, nil, nil, tc.events, nil, "", true)
993-
s.NoError(err)
996+
resetcases := []struct {
997+
name string
998+
isReset bool
999+
}{
1000+
{name: "reset", isReset: true},
1001+
{name: "no reset", isReset: false},
1002+
}
1003+
for _, tcReset := range resetcases {
1004+
mutableState := historyi.NewMockMutableState(s.controller)
1005+
// GetChildExecutionInfo should be called exactly once for each test case and none of the Add event methods must be called.
1006+
mutableState.EXPECT().GetChildExecutionInfo(testInitiatedEventID).
1007+
Times(len(testCases)).
1008+
Return(nil, false)
1009+
1010+
for _, tc := range testCases {
1011+
s.Run(tc.name+" "+tcReset.name, func() {
1012+
_, err := reapplyEvents(context.Background(), mutableState, nil, nil, tc.events, nil, "", tcReset.isReset)
1013+
s.NoError(err)
1014+
})
1015+
}
9941016
}
9951017
}
9961018

service/history/workflow/workflow_task_state_machine.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,13 +1189,18 @@ func (m *workflowTaskStateMachine) afterAddWorkflowTaskCompletedEvent(
11891189
}
11901190
}
11911191

1192-
// TODO(carlydf): create reset point based on attrs.Deployment instead of the build ID.
1192+
//nolint:staticcheck // SA1019: worker versioning v2
1193+
buildId := attrs.GetWorkerVersion().GetBuildId()
1194+
if wftDeployment != nil {
1195+
buildId = wftDeployment.GetBuildId()
1196+
}
11931197
addedResetPoint := m.ms.addResetPointFromCompletion(
11941198
attrs.GetBinaryChecksum(),
1195-
attrs.GetWorkerVersion().GetBuildId(),
1199+
buildId,
11961200
event.GetEventId(),
11971201
limits.MaxResetPoints,
11981202
)
1203+
11991204
// For v3 versioned workflows (ms.GetEffectiveVersioningBehavior() != UNSPECIFIED), this will update the reachability
12001205
// search attribute based on the execution_info.deployment and/or override deployment if one exists. We must update the
12011206
// search attribute here because the reachability deployment may have just been changed by CompleteDeploymentTransition.

service/matching/deployment_util.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ var (
1414
// [cleanup-wv-pre-release]
1515
errMissingDeployment = serviceerror.NewInvalidArgument("missing deployment")
1616

17-
errMissingDeploymentVersion = serviceerror.NewInvalidArgument("missing deployment version")
18-
errDeploymentVersionNotReady = serviceerror.NewUnavailable("task queue is not ready to process polls from this deployment version, try again shortly")
17+
errMissingDeploymentVersion = serviceerror.NewInvalidArgument("missing deployment version")
1918
)
2019

2120
// [cleanup-wv-pre-release]

service/matching/matching_engine.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,10 +1137,28 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
11371137
timeSinceLastFanOut := rootPM.TimeSinceLastFanOut()
11381138
lastFanOutTTL := tqConfig.TaskQueueInfoByBuildIdTTL()
11391139

1140-
// TODO bug fix: We cache the same map regardless of VersionSelection or TaskQueueTypes, so if someone queries the Activity Task Queue type of this task queue name, we cache that result and return it even if the next DescribeTQ call is about the WF TQ
1140+
// TODO bug fix: We cache the last response for each build ID. timeSinceLastFanOut is the last fan out time, that means some enteries in the cache can be more stale if
1141+
// user is calling this API back-to-back but with different version selection.
1142+
cacheIsFresh := timeSinceLastFanOut <= lastFanOutTTL
1143+
missingItemsInCache := false
11411144
physicalInfoByBuildId := make(map[string]map[enumspb.TaskQueueType]*taskqueuespb.PhysicalTaskQueueInfo)
1142-
if timeSinceLastFanOut > lastFanOutTTL {
1143-
// collect internal info
1145+
if cacheIsFresh {
1146+
// fetch info from rootPartition's cache for the cached versions
1147+
cache := rootPM.GetPhysicalTaskQueueInfoFromCache()
1148+
requestedBuildIds, err := e.getBuildIds(req.Versions)
1149+
if err != nil {
1150+
return nil, err
1151+
}
1152+
for b := range requestedBuildIds {
1153+
if c, ok := cache[b]; ok {
1154+
physicalInfoByBuildId[b] = c
1155+
} else {
1156+
missingItemsInCache = true
1157+
}
1158+
}
1159+
}
1160+
if !cacheIsFresh || missingItemsInCache {
1161+
// Fan out to partitions to get the needed info
11441162
numPartitions := max(tqConfig.NumWritePartitions(), tqConfig.NumReadPartitions())
11451163

11461164
for _, taskQueueType := range req.TaskQueueTypes {
@@ -1190,10 +1208,7 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
11901208
}
11911209
}
11921210
// update cache
1193-
rootPM.UpdateTimeSinceLastFanOutAndCache(physicalInfoByBuildId)
1194-
} else {
1195-
// fetch info from rootPartition's cache
1196-
physicalInfoByBuildId = rootPM.GetPhysicalTaskQueueInfoFromCache()
1211+
rootPM.UpdateTimeSinceLastFanOutAndCache(physicalInfoByBuildId, cacheIsFresh)
11971212
}
11981213

11991214
// smush internal info into versions info
@@ -1837,12 +1852,18 @@ func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
18371852

18381853
// take last writer for V2 rules and V3 data
18391854
if req.GetUserData().GetClock() == nil || current.GetClock() != nil && hlc.Greater(current.GetClock(), req.GetUserData().GetClock()) {
1840-
mergedData.AssignmentRules = currentVersioningData.GetAssignmentRules()
1841-
mergedData.RedirectRules = currentVersioningData.GetRedirectRules()
1855+
if mergedData != nil {
1856+
// v2 rules
1857+
mergedData.AssignmentRules = currentVersioningData.GetAssignmentRules()
1858+
mergedData.RedirectRules = currentVersioningData.GetRedirectRules()
1859+
}
18421860
mergedUserData.PerType = current.GetPerType()
18431861
} else {
1844-
mergedData.AssignmentRules = newVersioningData.GetAssignmentRules()
1845-
mergedData.RedirectRules = newVersioningData.GetRedirectRules()
1862+
if mergedData != nil {
1863+
// v2 rules
1864+
mergedData.AssignmentRules = newVersioningData.GetAssignmentRules()
1865+
mergedData.RedirectRules = newVersioningData.GetRedirectRules()
1866+
}
18461867
mergedUserData.PerType = req.GetUserData().GetPerType()
18471868
}
18481869

@@ -1865,8 +1886,10 @@ func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
18651886
}
18661887
}
18671888

1868-
// No need to keep the tombstones around after replication.
1869-
mergedUserData.VersioningData = ClearTombstones(mergedData)
1889+
if mergedData != nil {
1890+
// No need to keep the v1 tombstones around after replication.
1891+
mergedUserData.VersioningData = ClearTombstones(mergedData)
1892+
}
18701893
return mergedUserData, len(buildIdsToRevive) > 0, nil
18711894
})
18721895
return &matchingservice.ApplyTaskQueueUserDataReplicationEventResponse{}, err

0 commit comments

Comments
 (0)