Skip to content

Commit 735d9d1

Browse files
authored
functional test to verify update routing with versioning-0.32 (#7768)
## What changed? - added two new functional tests that verify whether updates, which are sent as speculative tasks, are being routed properly - also checks whether the MS has the right versioningInfo while the routing takes place. ## Why? - making this feature more robust ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s)
1 parent 9524cec commit 735d9d1

File tree

1 file changed

+197
-1
lines changed

1 file changed

+197
-1
lines changed

tests/versioning_3_test.go

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
nexuspb "go.temporal.io/api/nexus/v1"
2020
"go.temporal.io/api/serviceerror"
2121
taskqueuepb "go.temporal.io/api/taskqueue/v1"
22+
updatepb "go.temporal.io/api/update/v1"
2223
workflowpb "go.temporal.io/api/workflow/v1"
2324
"go.temporal.io/api/workflowservice/v1"
2425
"go.temporal.io/sdk/activity"
@@ -34,6 +35,7 @@ import (
3435
"go.temporal.io/server/common/dynamicconfig"
3536
"go.temporal.io/server/common/primitives/timestamp"
3637
"go.temporal.io/server/common/searchattribute"
38+
"go.temporal.io/server/common/testing/protoutils"
3739
"go.temporal.io/server/common/testing/taskpoller"
3840
"go.temporal.io/server/common/testing/testvars"
3941
"go.temporal.io/server/common/tqid"
@@ -63,7 +65,7 @@ const (
6365
)
6466

6567
type Versioning3Suite struct {
66-
testcore.FunctionalTestBase
68+
WorkflowUpdateBaseSuite
6769
}
6870

6971
func TestVersioning3FunctionalSuite(t *testing.T) {
@@ -454,6 +456,200 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) {
454456
s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil)
455457
}
456458

459+
// drainWorkflowTaskAfterSetCurrent is a helper that sets the current deployment version,
460+
// drains the initial workflow task from the execution, and ensures the task is correctly
461+
// routed to the appropriate build.
462+
func (s *Versioning3Suite) drainWorkflowTaskAfterSetCurrent(
463+
tv *testvars.TestVars,
464+
) (*commonpb.WorkflowExecution, string) {
465+
wftCompleted := make(chan struct{})
466+
s.pollWftAndHandle(tv, false, wftCompleted,
467+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
468+
s.NotNil(task)
469+
s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, tv.DeploymentVersionTransition())
470+
return respondEmptyWft(tv, false, vbUnpinned), nil
471+
})
472+
s.waitForDeploymentDataPropagation(tv, versionStatusInactive, false, tqTypeWf)
473+
s.setCurrentDeployment(tv)
474+
475+
runID := s.startWorkflow(tv, nil)
476+
execution := tv.WithRunID(runID).WorkflowExecution()
477+
478+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
479+
defer cancel()
480+
s.WaitForChannel(ctx, wftCompleted)
481+
482+
return execution, runID
483+
}
484+
485+
func (s *Versioning3Suite) TestUnpinnedWorkflow_SuccessfulUpdate_TransitionstoNewDeployment() {
486+
tv1 := testvars.New(s).WithBuildIDNumber(1)
487+
488+
execution, _ := s.drainWorkflowTaskAfterSetCurrent(tv1)
489+
490+
// Register the new version and set it to current
491+
tv2 := tv1.WithBuildIDNumber(2)
492+
s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
493+
s.setCurrentDeployment(tv2)
494+
495+
// Send update
496+
updateResultCh := s.sendUpdateNoError(tv2)
497+
498+
// Process update in workflow
499+
s.pollWftAndHandle(tv2, false, nil,
500+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
501+
s.NotNil(task)
502+
503+
// Verify that events from the speculative task are written to the task history
504+
s.EqualHistory(`
505+
1 WorkflowExecutionStarted
506+
2 WorkflowTaskScheduled
507+
3 WorkflowTaskStarted
508+
4 WorkflowTaskCompleted
509+
5 WorkflowTaskScheduled // Speculative WT events are not written to the history yet.
510+
6 WorkflowTaskStarted
511+
`, task.History)
512+
513+
// Verify that events from the speculative task are *not* written to the workflow history before being processed by the poller
514+
events := s.GetHistory(s.Namespace().String(), execution)
515+
s.EqualHistoryEvents(`
516+
1 WorkflowExecutionStarted
517+
2 WorkflowTaskScheduled
518+
3 WorkflowTaskStarted
519+
4 WorkflowTaskCompleted
520+
`, events)
521+
522+
// VersioningInfo should not have changed before the update has been processed by the poller.
523+
// Deployment version transition should also be nil since this is a speculative task.
524+
s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil)
525+
526+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
527+
Commands: s.UpdateAcceptCompleteCommands(tv2),
528+
Messages: s.UpdateAcceptCompleteMessages(tv2, task.Messages[0]),
529+
VersioningBehavior: vbUnpinned,
530+
DeploymentOptions: &deploymentpb.WorkerDeploymentOptions{
531+
BuildId: tv2.BuildID(),
532+
DeploymentName: tv2.DeploymentSeries(),
533+
WorkerVersioningMode: enumspb.WORKER_VERSIONING_MODE_VERSIONED,
534+
},
535+
}, nil
536+
})
537+
538+
updateResult := <-updateResultCh
539+
s.EqualValues("success-result-of-"+tv2.UpdateID(), testcore.DecodeString(s.T(), updateResult.GetOutcome().GetSuccess()))
540+
541+
// Verify that events from the speculative task are written to the history since the update was accepted
542+
events := s.GetHistory(s.Namespace().String(), execution)
543+
s.EqualHistoryEvents(`
544+
1 WorkflowExecutionStarted
545+
2 WorkflowTaskScheduled
546+
3 WorkflowTaskStarted
547+
4 WorkflowTaskCompleted
548+
5 WorkflowTaskScheduled // Was speculative WT...
549+
6 WorkflowTaskStarted
550+
7 WorkflowTaskCompleted // ...and events were written to the history when WT completes.
551+
8 WorkflowExecutionUpdateAccepted {"AcceptedRequestSequencingEventId": 5} // WTScheduled event which delivered update to the worker.
552+
9 WorkflowExecutionUpdateCompleted {"AcceptedEventId": 8}
553+
`, events)
554+
555+
// Verify that the versioning info is updated correctly.
556+
describeCall, err := s.FrontendClient().DescribeWorkflowExecution(context.Background(), &workflowservice.DescribeWorkflowExecutionRequest{
557+
Namespace: s.Namespace().String(),
558+
Execution: execution,
559+
})
560+
s.Nil(err)
561+
s.NotNil(describeCall)
562+
563+
// Since the poller accepted the update, the Worker Deployment Version that completed the last workflow task
564+
// of this workflow execution should have changed to the new version. However, the version transition should
565+
// still be nil.
566+
s.verifyWorkflowVersioning(tv2, vbUnpinned, tv2.Deployment(), nil, nil)
567+
568+
}
569+
570+
func (s *Versioning3Suite) TestUnpinnedWorkflow_FailedUpdate_DoesNotTransitionToNewDeployment() {
571+
tv1 := testvars.New(s).WithBuildIDNumber(1)
572+
573+
execution, _ := s.drainWorkflowTaskAfterSetCurrent(tv1)
574+
575+
// Register the new version and set it to current
576+
tv2 := tv1.WithBuildIDNumber(2)
577+
s.idlePollWorkflow(tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none")
578+
579+
s.setCurrentDeployment(tv2)
580+
s.waitForDeploymentDataPropagation(tv2, versionStatusInactive, false, tqTypeWf)
581+
582+
// Send update
583+
updateResultCh := s.sendUpdateNoError(tv2)
584+
585+
// Process update in workflow
586+
s.pollWftAndHandle(tv2, false, nil,
587+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
588+
s.NotNil(task)
589+
590+
// Verify that events from the speculative task are written to the task history
591+
s.EqualHistory(`
592+
1 WorkflowExecutionStarted
593+
2 WorkflowTaskScheduled
594+
3 WorkflowTaskStarted
595+
4 WorkflowTaskCompleted
596+
5 WorkflowTaskScheduled // Speculative WT events are not written to the history yet.
597+
6 WorkflowTaskStarted
598+
`, task.History)
599+
600+
// Verify that events from the speculative task are *not* written to the workflow history before being processed by the poller
601+
events := s.GetHistory(s.Namespace().String(), execution)
602+
s.EqualHistoryEvents(`
603+
1 WorkflowExecutionStarted
604+
2 WorkflowTaskScheduled
605+
3 WorkflowTaskStarted
606+
4 WorkflowTaskCompleted
607+
`, events)
608+
609+
// VersioningInfo should not have changed before the update has been processed by the poller.
610+
// Deployment version transition should also be nil since this is a speculative task.
611+
s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil)
612+
613+
updRequestMsg := task.Messages[0]
614+
updRequest := protoutils.UnmarshalAny[*updatepb.Request](s.T(), updRequestMsg.GetBody())
615+
616+
s.Equal("args-value-of-"+tv2.UpdateID(), testcore.DecodeString(s.T(), updRequest.GetInput().GetArgs()))
617+
s.Equal(tv2.HandlerName(), updRequest.GetInput().GetName())
618+
s.EqualValues(5, updRequestMsg.GetEventId())
619+
620+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
621+
Messages: s.UpdateRejectMessages(tv2, updRequestMsg),
622+
VersioningBehavior: vbUnpinned,
623+
DeploymentOptions: &deploymentpb.WorkerDeploymentOptions{
624+
BuildId: tv2.BuildID(),
625+
DeploymentName: tv2.DeploymentSeries(),
626+
WorkerVersioningMode: enumspb.WORKER_VERSIONING_MODE_VERSIONED,
627+
},
628+
}, nil
629+
})
630+
631+
updateResult := <-updateResultCh
632+
s.Equal("rejection-of-"+tv2.UpdateID(), updateResult.GetOutcome().GetFailure().GetMessage())
633+
634+
// Verify events from the speculative task are *not* written to the workflow history since the update was rejected
635+
events := s.GetHistory(s.Namespace().String(), execution)
636+
s.EqualHistoryEvents(`
637+
1 WorkflowExecutionStarted
638+
2 WorkflowTaskScheduled
639+
3 WorkflowTaskStarted
640+
4 WorkflowTaskCompleted
641+
`, events)
642+
643+
// Since the poller rejected the update, the Worker Deployment Version that completed the last workflow task
644+
// of this workflow execution should not have changed.
645+
s.verifyWorkflowVersioning(tv1, vbUnpinned, tv1.Deployment(), nil, nil)
646+
}
647+
648+
func (s *Versioning3Suite) sendUpdateNoError(tv *testvars.TestVars) <-chan *workflowservice.UpdateWorkflowExecutionResponse {
649+
s.T().Helper()
650+
return s.sendUpdateNoErrorInternal(tv, nil)
651+
}
652+
457653
func (s *Versioning3Suite) TestUnpinnedWorkflowWithRamp_ToVersioned() {
458654
s.RunTestWithMatchingBehavior(
459655
func() {

0 commit comments

Comments
 (0)