@@ -61,28 +61,31 @@ type (
61
61
}
62
62
63
63
replicationTasksHeartbeatDetails struct {
64
- NextIndex int
65
- CheckPoint time.Time
66
- LastNotFoundWorkflowExecution commonpb.WorkflowExecution
64
+ NextIndex int
65
+ CheckPoint time.Time
66
+ LastNotVerifiedWorkflowExecution commonpb.WorkflowExecution
67
+ LastVerifiedIndex int
67
68
}
68
69
69
- verifyReplicationTasksTimeoutErr struct {
70
- timeout time.Duration
71
- details replicationTasksHeartbeatDetails
70
+ verifyStatus int
71
+ verifyResult struct {
72
+ status verifyStatus
73
+ reason string
72
74
}
73
75
)
74
76
75
77
const (
76
78
reasonZombieWorkflow = "Zombie workflow"
77
79
reasonWorkflowNotFound = "Workflow not found"
78
80
reasonWorkflowCloseToRetention = "Workflow close to retention"
81
+
82
+ notVerified verifyStatus = 0
83
+ verified verifyStatus = 1
84
+ skipped verifyStatus = 2
79
85
)
80
86
81
- func (e verifyReplicationTasksTimeoutErr ) Error () string {
82
- return fmt .Sprintf ("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v," ,
83
- e .timeout ,
84
- e .details .LastNotFoundWorkflowExecution ,
85
- )
87
+ func (r verifyResult ) isVerified () bool {
88
+ return r .status == verified || r .status == skipped
86
89
}
87
90
88
91
// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
@@ -540,12 +543,12 @@ func isCloseToCurrentTime(t time.Time, duration time.Duration) bool {
540
543
return true
541
544
}
542
545
543
- func (a * activities ) canSkipWorkflowExecution (
546
+ func (a * activities ) checkSkipWorkflowExecution (
544
547
ctx context.Context ,
545
548
request * verifyReplicationTasksRequest ,
546
549
we * commonpb.WorkflowExecution ,
547
550
ns * namespace.Namespace ,
548
- ) (bool , string , error ) {
551
+ ) (verifyResult , error ) {
549
552
namespaceID := request .NamespaceID
550
553
tags := []tag.Tag {tag .WorkflowNamespaceID (namespaceID ), tag .WorkflowID (we .WorkflowId ), tag .WorkflowRunID (we .RunId )}
551
554
resp , err := a .historyClient .DescribeMutableState (ctx , & historyservice.DescribeMutableStateRequest {
@@ -558,30 +561,98 @@ func (a *activities) canSkipWorkflowExecution(
558
561
// The outstanding workflow execution may be deleted (due to retention) on source cluster after replication tasks were generated.
559
562
// Since retention runs on both source/target clusters, such execution may also be deleted (hence not found) from target cluster.
560
563
a .forceReplicationMetricsHandler .Counter (metrics .EncounterNotFoundWorkflowCount .GetMetricName ()).Record (1 )
561
- return true , reasonWorkflowNotFound , nil
564
+ return verifyResult {
565
+ status : skipped ,
566
+ reason : reasonWorkflowNotFound ,
567
+ }, nil
562
568
}
563
569
564
- return false , "" , err
570
+ return verifyResult {
571
+ status : notVerified ,
572
+ }, err
565
573
}
566
574
567
575
// Zombie workflow should be a transient state. However, if there is Zombie workflow on the source cluster,
568
576
// it is skipped to avoid such workflow being processed on the target cluster.
569
577
if resp .GetDatabaseMutableState ().GetExecutionState ().GetState () == enumsspb .WORKFLOW_EXECUTION_STATE_ZOMBIE {
570
578
a .forceReplicationMetricsHandler .Counter (metrics .EncounterZombieWorkflowCount .GetMetricName ()).Record (1 )
571
579
a .logger .Info ("createReplicationTasks skip Zombie workflow" , tags ... )
572
- return true , reasonZombieWorkflow , nil
580
+ return verifyResult {
581
+ status : skipped ,
582
+ reason : reasonZombieWorkflow ,
583
+ }, nil
573
584
}
574
585
575
586
// Skip verifying workflow which has already passed retention time.
576
587
if closeTime := resp .GetDatabaseMutableState ().GetExecutionInfo ().GetCloseTime (); closeTime != nil && ns != nil && ns .Retention () > 0 {
577
588
deleteTime := closeTime .Add (ns .Retention ())
578
589
if deleteTime .Before (time .Now ()) {
579
590
a .forceReplicationMetricsHandler .Counter (metrics .EncounterPassRetentionWorkflowCount .GetMetricName ()).Record (1 )
580
- return true , reasonWorkflowCloseToRetention , nil
591
+ return verifyResult {
592
+ status : skipped ,
593
+ reason : reasonWorkflowCloseToRetention ,
594
+ }, nil
581
595
}
582
596
}
583
597
584
- return false , "" , nil
598
+ return verifyResult {
599
+ status : notVerified ,
600
+ }, nil
601
+ }
602
+
603
+ func (a * activities ) verifySingleReplicationTask (
604
+ ctx context.Context ,
605
+ request * verifyReplicationTasksRequest ,
606
+ remoteClient adminservice.AdminServiceClient ,
607
+ ns * namespace.Namespace ,
608
+ cachedResults map [int ]verifyResult ,
609
+ idx int ,
610
+ ) (result verifyResult , rerr error ) {
611
+ if r , ok := cachedResults [idx ]; ok {
612
+ return r , nil
613
+ }
614
+
615
+ defer func () {
616
+ if result .isVerified () {
617
+ cachedResults [idx ] = result
618
+ }
619
+ }()
620
+
621
+ we := request .Executions [idx ]
622
+ s := time .Now ()
623
+ // Check if execution exists on remote cluster
624
+ _ , err := remoteClient .DescribeMutableState (ctx , & adminservice.DescribeMutableStateRequest {
625
+ Namespace : request .Namespace ,
626
+ Execution : & we ,
627
+ })
628
+ a .forceReplicationMetricsHandler .Timer (metrics .VerifyDescribeMutableStateLatency .GetMetricName ()).Record (time .Since (s ))
629
+
630
+ switch err .(type ) {
631
+ case nil :
632
+ a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace )).Counter (metrics .VerifyReplicationTaskSuccess .GetMetricName ()).Record (1 )
633
+ return verifyResult {
634
+ status : verified ,
635
+ }, nil
636
+
637
+ case * serviceerror.NotFound :
638
+ a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace )).Counter (metrics .VerifyReplicationTaskNotFound .GetMetricName ()).Record (1 )
639
+ // Calling checkSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention.
640
+ // A better solution is to only check the existence for workflow which is close to retention period.
641
+ return a .checkSkipWorkflowExecution (ctx , request , & we , ns )
642
+
643
+ case * serviceerror.NamespaceNotFound :
644
+ return verifyResult {
645
+ status : notVerified ,
646
+ }, temporal .NewNonRetryableApplicationError ("remoteClient.DescribeMutableState call failed" , "NamespaceNotFound" , err )
647
+
648
+ default :
649
+ a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace ), metrics .ServiceErrorTypeTag (err )).
650
+ Counter (metrics .VerifyReplicationTaskFailed .GetMetricName ()).Record (1 )
651
+
652
+ return verifyResult {
653
+ status : notVerified ,
654
+ }, errors .WithMessage (err , "remoteClient.DescribeMutableState call failed" )
655
+ }
585
656
}
586
657
587
658
func (a * activities ) verifyReplicationTasks (
@@ -590,8 +661,9 @@ func (a *activities) verifyReplicationTasks(
590
661
details * replicationTasksHeartbeatDetails ,
591
662
remoteClient adminservice.AdminServiceClient ,
592
663
ns * namespace.Namespace ,
664
+ cachedResults map [int ]verifyResult ,
593
665
heartbeat func (details replicationTasksHeartbeatDetails ),
594
- ) (bool , [] SkippedWorkflowExecution , error ) {
666
+ ) (bool , error ) {
595
667
start := time .Now ()
596
668
progress := false
597
669
defer func () {
@@ -604,55 +676,49 @@ func (a *activities) verifyReplicationTasks(
604
676
a .forceReplicationMetricsHandler .Timer (metrics .VerifyReplicationTasksLatency .GetMetricName ()).Record (time .Since (start ))
605
677
}()
606
678
607
- var skippedList []SkippedWorkflowExecution
608
679
for ; details .NextIndex < len (request .Executions ); details .NextIndex ++ {
609
- we := request .Executions [details .NextIndex ]
610
- s := time .Now ()
611
- // Check if execution exists on remote cluster
612
- _ , err := remoteClient .DescribeMutableState (ctx , & adminservice.DescribeMutableStateRequest {
613
- Namespace : request .Namespace ,
614
- Execution : & we ,
615
- })
616
- a .forceReplicationMetricsHandler .Timer (metrics .VerifyDescribeMutableStateLatency .GetMetricName ()).Record (time .Since (s ))
617
-
618
- switch err .(type ) {
619
- case nil :
620
- a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace )).Counter (metrics .VerifyReplicationTaskSuccess .GetMetricName ()).Record (1 )
621
-
622
- case * serviceerror.NotFound :
623
- a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace )).Counter (metrics .VerifyReplicationTaskNotFound .GetMetricName ()).Record (1 )
624
- // Calling canSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention.
625
- // A better solution is to only check the existence for workflow which is close to retention period.
626
- canSkip , reason , err := a .canSkipWorkflowExecution (ctx , request , & we , ns )
627
- if err != nil {
628
- return false , skippedList , err
629
- }
680
+ r , err := a .verifySingleReplicationTask (ctx , request , remoteClient , ns , cachedResults , details .NextIndex )
681
+ if err != nil {
682
+ return false , err
683
+ }
630
684
631
- if ! canSkip {
632
- details .LastNotFoundWorkflowExecution = we
633
- return false , skippedList , nil
634
- }
685
+ if ! r . isVerified () {
686
+ details .LastNotVerifiedWorkflowExecution = request . Executions [ details . NextIndex ]
687
+ break
688
+ }
635
689
636
- skippedList = append (skippedList , SkippedWorkflowExecution {
637
- WorkflowExecution : we ,
638
- Reason : reason ,
639
- })
690
+ details .LastVerifiedIndex = details .NextIndex
691
+ heartbeat (* details )
692
+ progress = true
693
+ }
694
+
695
+ if details .NextIndex >= len (request .Executions ) {
696
+ // Done with verification.
697
+ return true , nil
698
+ }
640
699
641
- case * serviceerror.NamespaceNotFound :
642
- return false , skippedList , temporal .NewNonRetryableApplicationError ("remoteClient.DescribeMutableState call failed" , "NamespaceNotFound" , err )
700
+ // Look ahead and see if there is any new workflow being replicated on target cluster. If yes, then consider it is a progress.
701
+ // This is to avoid verifyReplicationTasks from failing due to LastNotFoundWorkflowExecution being slow.
702
+ for idx := details .NextIndex + 1 ; idx < len (request .Executions ); idx ++ {
703
+ // Cache results don't count for progress.
704
+ if _ , ok := cachedResults [idx ]; ok {
705
+ continue
706
+ }
643
707
644
- default :
645
- a .forceReplicationMetricsHandler .WithTags (metrics .NamespaceTag (request .Namespace ), metrics .ServiceErrorTypeTag (err )).
646
- Counter (metrics .VerifyReplicationTaskFailed .GetMetricName ()).Record (1 )
708
+ r , err := a .verifySingleReplicationTask (ctx , request , remoteClient , ns , cachedResults , idx )
709
+ if err != nil {
710
+ return false , err
711
+ }
647
712
648
- return false , skippedList , errors .WithMessage (err , "remoteClient.DescribeMutableState call failed" )
713
+ if r .isVerified () {
714
+ details .LastVerifiedIndex = idx
715
+ progress = true
649
716
}
650
717
651
718
heartbeat (* details )
652
- progress = true
653
719
}
654
720
655
- return true , skippedList , nil
721
+ return false , nil
656
722
}
657
723
658
724
const (
@@ -684,6 +750,8 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
684
750
return response , err
685
751
}
686
752
753
+ cachedResults := make (map [int ]verifyResult )
754
+
687
755
// Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster.
688
756
// 1. replication lag
689
757
// 2. Zombie workflow execution
@@ -700,20 +768,14 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
700
768
// Since replication has a lag, sleep first.
701
769
time .Sleep (request .VerifyInterval )
702
770
703
- verified , skippedList , err := a .verifyReplicationTasks (ctx , request , & details , remoteClient , nsEntry ,
771
+ verified , err := a .verifyReplicationTasks (ctx , request , & details , remoteClient , nsEntry , cachedResults ,
704
772
func (d replicationTasksHeartbeatDetails ) {
705
773
activity .RecordHeartbeat (ctx , d )
706
774
})
707
-
708
775
if err != nil {
709
776
return response , err
710
777
}
711
778
712
- if len (skippedList ) > 0 {
713
- response .SkippedWorkflowExecutions = append (response .SkippedWorkflowExecutions , skippedList ... )
714
- response .SkippedWorkflowCount = len (response .SkippedWorkflowExecutions )
715
- }
716
-
717
779
if verified == true {
718
780
return response , nil
719
781
}
@@ -724,7 +786,7 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
724
786
return response , temporal .NewNonRetryableApplicationError (
725
787
fmt .Sprintf ("verifyReplicationTasks was not able to make progress for more than %v minutes (not retryable). Not found WorkflowExecution: %v, Checkpoint: %v" ,
726
788
diff .Minutes (),
727
- details .LastNotFoundWorkflowExecution , details .CheckPoint ),
789
+ details .LastNotVerifiedWorkflowExecution , details .CheckPoint ),
728
790
"" , nil )
729
791
}
730
792
}
0 commit comments