Skip to content

Commit f163b88

Browse files
wxing1292rodrigozhou
authored andcommitted
Make shard ownership more resilient when shard is busy (#4283)
* Do timeout check after shard lock acquired, not before
1 parent 5225f13 commit f163b88

File tree

1 file changed

+52
-44
lines changed

1 file changed

+52
-44
lines changed

service/history/shard/context_impl.go

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -578,53 +578,51 @@ func (s *ContextImpl) AddTasks(
578578
ctx context.Context,
579579
request *persistence.AddHistoryTasksRequest,
580580
) error {
581-
ctx, cancel, err := s.newDetachedContext(ctx)
581+
engine, err := s.GetEngine(ctx)
582582
if err != nil {
583583
return err
584584
}
585-
defer cancel()
585+
err = s.addTasksWithoutNotification(ctx, request)
586+
if OperationPossiblySucceeded(err) {
587+
engine.NotifyNewTasks(request.Tasks)
588+
}
589+
return err
590+
}
586591

592+
func (s *ContextImpl) addTasksWithoutNotification(
593+
ctx context.Context,
594+
request *persistence.AddHistoryTasksRequest,
595+
) error {
587596
// do not try to get namespace cache within shard lock
588597
namespaceID := namespace.ID(request.NamespaceID)
589598
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
590599
if err != nil {
591600
return err
592601
}
593602

594-
engine, err := s.GetEngine(ctx)
603+
s.wLock()
604+
defer s.wUnlock()
605+
606+
// timeout check should be done within the shard lock, in case of shard lock contention
607+
ctx, cancel, err := s.newDetachedContext(ctx)
595608
if err != nil {
596609
return err
597610
}
611+
defer cancel()
598612

599-
s.wLock()
600613
if err := s.errorByState(); err != nil {
601-
s.wUnlock()
602614
return err
603615
}
604616
if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
605-
s.wUnlock()
606617
return err
607618
}
608-
err = s.addTasksLocked(ctx, request, namespaceEntry)
609-
s.wUnlock()
610-
611-
if OperationPossiblySucceeded(err) {
612-
engine.NotifyNewTasks(request.Tasks)
613-
}
614-
615-
return err
619+
return s.addTasksLocked(ctx, request, namespaceEntry)
616620
}
617621

618622
func (s *ContextImpl) CreateWorkflowExecution(
619623
ctx context.Context,
620624
request *persistence.CreateWorkflowExecutionRequest,
621625
) (*persistence.CreateWorkflowExecutionResponse, error) {
622-
ctx, cancel, err := s.newDetachedContext(ctx)
623-
if err != nil {
624-
return nil, err
625-
}
626-
defer cancel()
627-
628626
// do not try to get namespace cache within shard lock
629627
namespaceID := namespace.ID(request.NewWorkflowSnapshot.ExecutionInfo.NamespaceId)
630628
workflowID := request.NewWorkflowSnapshot.ExecutionInfo.WorkflowId
@@ -636,6 +634,13 @@ func (s *ContextImpl) CreateWorkflowExecution(
636634
s.wLock()
637635
defer s.wUnlock()
638636

637+
// timeout check should be done within the shard lock, in case of shard lock contention
638+
ctx, cancel, err := s.newDetachedContext(ctx)
639+
if err != nil {
640+
return nil, err
641+
}
642+
defer cancel()
643+
639644
if err := s.errorByState(); err != nil {
640645
return nil, err
641646
}
@@ -668,12 +673,6 @@ func (s *ContextImpl) UpdateWorkflowExecution(
668673
ctx context.Context,
669674
request *persistence.UpdateWorkflowExecutionRequest,
670675
) (*persistence.UpdateWorkflowExecutionResponse, error) {
671-
ctx, cancel, err := s.newDetachedContext(ctx)
672-
if err != nil {
673-
return nil, err
674-
}
675-
defer cancel()
676-
677676
// do not try to get namespace cache within shard lock
678677
namespaceID := namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId)
679678
workflowID := request.UpdateWorkflowMutation.ExecutionInfo.WorkflowId
@@ -685,6 +684,13 @@ func (s *ContextImpl) UpdateWorkflowExecution(
685684
s.wLock()
686685
defer s.wUnlock()
687686

687+
// timeout check should be done within the shard lock, in case of shard lock contention
688+
ctx, cancel, err := s.newDetachedContext(ctx)
689+
if err != nil {
690+
return nil, err
691+
}
692+
defer cancel()
693+
688694
if err := s.errorByState(); err != nil {
689695
return nil, err
690696
}
@@ -743,12 +749,6 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
743749
ctx context.Context,
744750
request *persistence.ConflictResolveWorkflowExecutionRequest,
745751
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
746-
ctx, cancel, err := s.newDetachedContext(ctx)
747-
if err != nil {
748-
return nil, err
749-
}
750-
defer cancel()
751-
752752
// do not try to get namespace cache within shard lock
753753
namespaceID := namespace.ID(request.ResetWorkflowSnapshot.ExecutionInfo.NamespaceId)
754754
workflowID := request.ResetWorkflowSnapshot.ExecutionInfo.WorkflowId
@@ -760,6 +760,13 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
760760
s.wLock()
761761
defer s.wUnlock()
762762

763+
// timeout check should be done within the shard lock, in case of shard lock contention
764+
ctx, cancel, err := s.newDetachedContext(ctx)
765+
if err != nil {
766+
return nil, err
767+
}
768+
defer cancel()
769+
763770
if err := s.errorByState(); err != nil {
764771
return nil, err
765772
}
@@ -811,12 +818,6 @@ func (s *ContextImpl) SetWorkflowExecution(
811818
ctx context.Context,
812819
request *persistence.SetWorkflowExecutionRequest,
813820
) (*persistence.SetWorkflowExecutionResponse, error) {
814-
ctx, cancel, err := s.newDetachedContext(ctx)
815-
if err != nil {
816-
return nil, err
817-
}
818-
defer cancel()
819-
820821
// do not try to get namespace cache within shard lock
821822
namespaceID := namespace.ID(request.SetWorkflowSnapshot.ExecutionInfo.NamespaceId)
822823
workflowID := request.SetWorkflowSnapshot.ExecutionInfo.WorkflowId
@@ -828,6 +829,13 @@ func (s *ContextImpl) SetWorkflowExecution(
828829
s.wLock()
829830
defer s.wUnlock()
830831

832+
// timeout check should be done within the shard lock, in case of shard lock contention
833+
ctx, cancel, err := s.newDetachedContext(ctx)
834+
if err != nil {
835+
return nil, err
836+
}
837+
defer cancel()
838+
831839
if err := s.errorByState(); err != nil {
832840
return nil, err
833841
}
@@ -966,12 +974,6 @@ func (s *ContextImpl) DeleteWorkflowExecution(
966974
// The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually.
967975
// Stage 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state).
968976

969-
ctx, cancel, err := s.newDetachedContext(ctx)
970-
if err != nil {
971-
return err
972-
}
973-
defer cancel()
974-
975977
engine, err := s.GetEngine(ctx)
976978
if err != nil {
977979
return err
@@ -1008,6 +1010,12 @@ func (s *ContextImpl) DeleteWorkflowExecution(
10081010
s.wLock()
10091011
defer s.wUnlock()
10101012

1013+
ctx, cancel, err := s.newDetachedContext(ctx)
1014+
if err != nil {
1015+
return err
1016+
}
1017+
defer cancel()
1018+
10111019
if err := s.errorByState(); err != nil {
10121020
return err
10131021
}

0 commit comments

Comments
 (0)