Skip to content

Commit 5affd30

Browse files
authored
Merge branch 'main' into shahab/version-delim
2 parents f19170a + c224229 commit 5affd30

File tree

6 files changed

+173
-14
lines changed

6 files changed

+173
-14
lines changed

service/matching/matching_engine.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,12 +1858,18 @@ func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
18581858

18591859
// take last writer for V2 rules and V3 data
18601860
if req.GetUserData().GetClock() == nil || current.GetClock() != nil && hlc.Greater(current.GetClock(), req.GetUserData().GetClock()) {
1861-
mergedData.AssignmentRules = currentVersioningData.GetAssignmentRules()
1862-
mergedData.RedirectRules = currentVersioningData.GetRedirectRules()
1861+
if mergedData != nil {
1862+
// v2 rules
1863+
mergedData.AssignmentRules = currentVersioningData.GetAssignmentRules()
1864+
mergedData.RedirectRules = currentVersioningData.GetRedirectRules()
1865+
}
18631866
mergedUserData.PerType = current.GetPerType()
18641867
} else {
1865-
mergedData.AssignmentRules = newVersioningData.GetAssignmentRules()
1866-
mergedData.RedirectRules = newVersioningData.GetRedirectRules()
1868+
if mergedData != nil {
1869+
// v2 rules
1870+
mergedData.AssignmentRules = newVersioningData.GetAssignmentRules()
1871+
mergedData.RedirectRules = newVersioningData.GetRedirectRules()
1872+
}
18671873
mergedUserData.PerType = req.GetUserData().GetPerType()
18681874
}
18691875

@@ -1886,8 +1892,10 @@ func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
18861892
}
18871893
}
18881894

1889-
// No need to keep the tombstones around after replication.
1890-
mergedUserData.VersioningData = ClearTombstones(mergedData)
1895+
if mergedData != nil {
1896+
// No need to keep the v1 tombstones around after replication.
1897+
mergedUserData.VersioningData = ClearTombstones(mergedData)
1898+
}
18911899
return mergedUserData, len(buildIdsToRevive) > 0, nil
18921900
})
18931901
return &matchingservice.ApplyTaskQueueUserDataReplicationEventResponse{}, err

service/worker/migration/activities.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ func (a *activities) UpdateActiveCluster(ctx context.Context, req updateActiveCl
424424

425425
func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) {
426426
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(request.Namespace, headers.CallerTypePreemptable, ""))
427+
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(interceptor.DCRedirectionContextHeaderName, "false"))
427428

428429
resp, err := a.frontendClient.ListWorkflowExecutions(ctx, request)
429430
if err != nil {

service/worker/scheduler/spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (cs *CompiledSpec) GetNextTime(jitterSeed string, after time.Time) GetNextT
276276
after = util.MaxTime(after, cs.spec.StartTime.AsTime().Add(-time.Second))
277277

278278
pastEndTime := func(t time.Time) bool {
279-
return cs.spec.EndTime != nil && t.After(cs.spec.EndTime.AsTime())
279+
return cs.spec.EndTime != nil && t.After(cs.spec.EndTime.AsTime()) || t.Year() > maxCalendarYear
280280
}
281281
var nominal time.Time
282282
for nominal.IsZero() || cs.excluded(nominal) {

service/worker/scheduler/spec_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,19 @@ func (s *specSuite) TestSpecExclude() {
366366
)
367367
}
368368

369+
func (s *specSuite) TestExcludeAll() {
370+
cs, err := s.specBuilder.NewCompiledSpec(&schedulepb.ScheduleSpec{
371+
Interval: []*schedulepb.IntervalSpec{
372+
{Interval: durationpb.New(7 * 24 * time.Hour)},
373+
},
374+
ExcludeCalendar: []*schedulepb.CalendarSpec{
375+
&schedulepb.CalendarSpec{Second: "*", Minute: "*", Hour: "*"},
376+
},
377+
})
378+
s.NoError(err)
379+
s.Zero(cs.GetNextTime("", time.Date(2022, 3, 23, 12, 53, 2, 9, time.UTC)))
380+
}
381+
369382
func (s *specSuite) TestSpecStartTime() {
370383
s.checkSequenceFull(
371384
"",

tests/update_workflow_test.go

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5317,7 +5317,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
53175317
s.Equal("Operation was aborted.", errs[1].Error())
53185318
})
53195319

5320-
s.Run("dedupes retry", func() {
5320+
s.Run("receive completed update result", func() {
53215321
for _, p := range []enumspb.WorkflowIdConflictPolicy{
53225322
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
53235323
enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
@@ -5327,8 +5327,6 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
53275327
tv := testvars.New(s.T())
53285328

53295329
startReq := startWorkflowReq(tv)
5330-
startReq.RequestId = "request_id"
5331-
startReq.WorkflowIdConflictPolicy = p
53325330
updReq := s.updateWorkflowRequest(tv,
53335331
&updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
53345332

@@ -5344,12 +5342,12 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
53445342
uwsRes1 := <-uwsCh1
53455343
s.NoError(uwsRes1.err)
53465344

5347-
// 2nd update-with-start: using *same* RequestID and UpdateID
5345+
// 2nd update-with-start: using *same* UpdateID - but *different* RequestID
53485346
uwsRes2 := <-sendUpdateWithStart(testcore.NewContext(), startReq, updReq)
53495347
s.NoError(uwsRes2.err)
53505348

5351-
s.Equal(uwsRes1.response.Responses[0].GetStartWorkflow().RunId, uwsRes1.response.Responses[0].GetStartWorkflow().RunId)
5352-
s.Equal(uwsRes1.response.Responses[1].GetUpdateWorkflow().Outcome.String(), uwsRes1.response.Responses[1].GetUpdateWorkflow().Outcome.String())
5349+
s.Equal(uwsRes1.response.Responses[0].GetStartWorkflow().RunId, uwsRes2.response.Responses[0].GetStartWorkflow().RunId)
5350+
s.Equal(uwsRes1.response.Responses[1].GetUpdateWorkflow().Outcome.String(), uwsRes2.response.Responses[1].GetUpdateWorkflow().Outcome.String())
53535351

53545352
// poll update to ensure same outcome is returned
53555353
pollRes, err := s.pollUpdate(tv,
@@ -5359,6 +5357,52 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
53595357
})
53605358
}
53615359
})
5360+
5361+
s.Run("dedupes start", func() {
5362+
for _, p := range []enumspb.WorkflowIdConflictPolicy{
5363+
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
5364+
enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,
5365+
enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
5366+
} {
5367+
s.Run(fmt.Sprintf("for workflow id conflict policy %v", p), func() {
5368+
tv := testvars.New(s.T())
5369+
5370+
startReq := startWorkflowReq(tv)
5371+
startReq.RequestId = "request_id"
5372+
startReq.WorkflowIdConflictPolicy = p
5373+
updReq1 := s.updateWorkflowRequest(tv.WithUpdateIDNumber(1),
5374+
&updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
5375+
5376+
// 1st update-with-start
5377+
uwsCh1 := sendUpdateWithStart(testcore.NewContext(), startReq, updReq1)
5378+
_, err := s.TaskPoller().PollAndHandleWorkflowTask(tv,
5379+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
5380+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
5381+
Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0]),
5382+
}, nil
5383+
})
5384+
s.NoError(err)
5385+
uwsRes1 := <-uwsCh1
5386+
s.NoError(uwsRes1.err)
5387+
5388+
// 2nd update-with-start: using *same* RequestID - but *different* UpdateID
5389+
updReq2 := s.updateWorkflowRequest(tv.WithUpdateIDNumber(2),
5390+
&updatepb.WaitPolicy{LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED})
5391+
uwsCh2 := sendUpdateWithStart(testcore.NewContext(), startReq, updReq2)
5392+
_, err = s.TaskPoller().PollAndHandleWorkflowTask(tv,
5393+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
5394+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
5395+
Messages: s.UpdateAcceptCompleteMessages(tv, task.Messages[0]),
5396+
}, nil
5397+
})
5398+
s.NoError(err)
5399+
uwsRes2 := <-uwsCh2
5400+
s.NoError(uwsRes1.err)
5401+
5402+
s.Equal(uwsRes1.response.Responses[0].GetStartWorkflow().RunId, uwsRes2.response.Responses[0].GetStartWorkflow().RunId)
5403+
})
5404+
}
5405+
})
53625406
})
53635407

53645408
s.Run("workflow is closed", func() {
@@ -5453,7 +5497,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
54535497
s.Equal("Operation was aborted.", errs[1].Error())
54545498
})
54555499

5456-
s.Run("still receive update result regardless of conflict policy", func() {
5500+
s.Run("receive completed update result", func() {
54575501
for _, p := range []enumspb.WorkflowIdConflictPolicy{
54585502
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
54595503
enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING,

tests/xdc/user_data_replication_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ import (
1616
"go.temporal.io/api/workflowservice/v1"
1717
sdkclient "go.temporal.io/sdk/client"
1818
"go.temporal.io/server/api/adminservice/v1"
19+
deploymentspb "go.temporal.io/server/api/deployment/v1"
1920
enumsspb "go.temporal.io/server/api/enums/v1"
2021
"go.temporal.io/server/api/matchingservice/v1"
2122
persistencespb "go.temporal.io/server/api/persistence/v1"
2223
"go.temporal.io/server/common"
2324
"go.temporal.io/server/common/clock/hybrid_logical_clock"
2425
"go.temporal.io/server/common/dynamicconfig"
2526
"go.temporal.io/server/common/primitives"
27+
"go.temporal.io/server/common/primitives/timestamp"
2628
"go.temporal.io/server/common/worker_versioning"
2729
"go.temporal.io/server/service/worker/migration"
2830
"go.temporal.io/server/service/worker/scanner/build_ids"
@@ -250,6 +252,97 @@ func (s *UserDataReplicationTestSuite) TestUserDataIsReplicatedFromActiveToPassi
250252
}, replicationWaitTime, replicationCheckInterval)
251253
}
252254

255+
func (s *UserDataReplicationTestSuite) TestUserDataIsReplicatedFromActiveToPassiveV3() {
256+
namespace := s.T().Name() + "-" + common.GenerateRandomString(5)
257+
taskQueue := "versioned"
258+
ctx := testcore.NewContext()
259+
activeFrontendClient := s.clusters[0].FrontendClient()
260+
activeMatchingClient := s.clusters[0].MatchingClient()
261+
standbyMatchingClient := s.clusters[1].MatchingClient()
262+
regReq := &workflowservice.RegisterNamespaceRequest{
263+
Namespace: namespace,
264+
IsGlobalNamespace: true,
265+
Clusters: s.clusterReplicationConfig(),
266+
ActiveClusterName: s.clusters[0].ClusterName(),
267+
WorkflowExecutionRetentionPeriod: durationpb.New(7 * time.Hour * 24),
268+
}
269+
_, err := activeFrontendClient.RegisterNamespace(ctx, regReq)
270+
s.NoError(err)
271+
272+
description, err := activeFrontendClient.DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{Namespace: namespace})
273+
s.Require().NoError(err)
274+
275+
expectedVersionData := &deploymentspb.DeploymentVersionData{
276+
Version: &deploymentspb.WorkerDeploymentVersion{
277+
BuildId: "v1",
278+
DeploymentName: "d1",
279+
},
280+
RampingSinceTime: timestamp.TimePtr(time.Now()),
281+
RampPercentage: 10,
282+
}
283+
284+
_, err = activeMatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
285+
NamespaceId: description.GetNamespaceInfo().GetId(),
286+
TaskQueue: taskQueue,
287+
TaskQueueTypes: []enumspb.TaskQueueType{enumspb.TASK_QUEUE_TYPE_WORKFLOW, enumspb.TASK_QUEUE_TYPE_ACTIVITY, enumspb.TASK_QUEUE_TYPE_NEXUS},
288+
Operation: &matchingservice.SyncDeploymentUserDataRequest_UpdateVersionData{
289+
UpdateVersionData: expectedVersionData,
290+
},
291+
})
292+
s.NoError(err)
293+
294+
s.EventuallyWithT(func(t *assert.CollectT) {
295+
// Call matching directly in case frontend is configured to redirect API calls to the active cluster
296+
response, err := standbyMatchingClient.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{
297+
NamespaceId: description.GetNamespaceInfo().Id,
298+
TaskQueue: taskQueue,
299+
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
300+
})
301+
a := assert.New(t)
302+
a.NoError(err)
303+
if perType := response.GetUserData().GetData().GetPerType(); a.NotNil(perType) {
304+
for tqType := 1; tqType <= 3; tqType++ {
305+
data := perType[int32(tqType)].GetDeploymentData()
306+
if a.Equal(1, len(data.GetVersions())) {
307+
a.True(data.GetVersions()[0].Equal(expectedVersionData))
308+
}
309+
}
310+
}
311+
}, replicationWaitTime, replicationCheckInterval)
312+
313+
// make another change to test that merging works
314+
315+
expectedVersionData.RampPercentage = 20
316+
_, err = activeMatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
317+
NamespaceId: description.GetNamespaceInfo().GetId(),
318+
TaskQueue: taskQueue,
319+
TaskQueueTypes: []enumspb.TaskQueueType{enumspb.TASK_QUEUE_TYPE_WORKFLOW, enumspb.TASK_QUEUE_TYPE_ACTIVITY, enumspb.TASK_QUEUE_TYPE_NEXUS},
320+
Operation: &matchingservice.SyncDeploymentUserDataRequest_UpdateVersionData{
321+
UpdateVersionData: expectedVersionData,
322+
},
323+
})
324+
s.NoError(err)
325+
326+
s.EventuallyWithT(func(t *assert.CollectT) {
327+
// Call matching directly in case frontend is configured to redirect API calls to the active cluster
328+
response, err := standbyMatchingClient.GetTaskQueueUserData(ctx, &matchingservice.GetTaskQueueUserDataRequest{
329+
NamespaceId: description.GetNamespaceInfo().Id,
330+
TaskQueue: taskQueue,
331+
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
332+
})
333+
a := assert.New(t)
334+
a.NoError(err)
335+
if perType := response.GetUserData().GetData().GetPerType(); a.NotNil(perType) {
336+
for tqType := 1; tqType <= 3; tqType++ {
337+
data := perType[int32(tqType)].GetDeploymentData()
338+
if a.Equal(1, len(data.GetVersions())) {
339+
a.True(data.GetVersions()[0].Equal(expectedVersionData))
340+
}
341+
}
342+
}
343+
}, replicationWaitTime, replicationCheckInterval)
344+
}
345+
253346
func (s *UserDataReplicationTestSuite) TestUserDataIsReplicatedFromPassiveToActive() {
254347
namespace := s.createGlobalNamespace()
255348
taskQueue := "versioned"

0 commit comments

Comments
 (0)