Skip to content

Commit df2d8eb

Browse files
committed
Fix namespace handover replication queue notification (#4082)
1 parent a8b591d commit df2d8eb

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed

service/history/shard/context_impl.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,14 @@ func (s *ContextImpl) UpdateHandoverNamespace(ns *namespace.Namespace, deletedFr
564564
}
565565

566566
s.wUnlock()
567-
s.notifyReplicationQueueProcessor(maxReplicationTaskID)
567+
568+
if maxReplicationTaskID != pendingMaxReplicationTaskID {
569+
// notification is for making sure replication queue is able to
570+
// ack to the recorded taskID. If the taskID is pending, then
571+
// don't notify. Otherwise, replication queue will think (for a period of time)
572+
// that the max generated taskID is pendingMaxReplicationTaskID which is MaxInt64.
573+
s.notifyReplicationQueueProcessor(maxReplicationTaskID)
574+
}
568575
}
569576

570577
func (s *ContextImpl) AddTasks(

service/history/shard/context_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/golang/mock/gomock"
3636
"github.com/stretchr/testify/require"
3737
"github.com/stretchr/testify/suite"
38+
"go.temporal.io/api/enums/v1"
3839

3940
persistencespb "go.temporal.io/server/api/persistence/v1"
4041
"go.temporal.io/server/common/backoff"
@@ -393,3 +394,55 @@ func (s *contextSuite) TestAcquireShardNoError() {
393394

394395
s.Assert().Equal(contextStateAcquired, s.mockShard.state)
395396
}
397+
398+
func (s *contextSuite) TestHandoverNamespace() {
399+
s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any()).Times(1)
400+
401+
namespaceEntry := namespace.NewGlobalNamespaceForTest(
402+
&persistencespb.NamespaceInfo{Id: tests.NamespaceID.String(), Name: tests.Namespace.String()},
403+
&persistencespb.NamespaceConfig{
404+
Retention: timestamp.DurationFromDays(1),
405+
},
406+
&persistencespb.NamespaceReplicationConfig{
407+
ActiveClusterName: cluster.TestCurrentClusterName,
408+
Clusters: []string{
409+
cluster.TestCurrentClusterName,
410+
cluster.TestAlternativeClusterName,
411+
},
412+
State: enums.REPLICATION_STATE_HANDOVER,
413+
},
414+
tests.Version,
415+
)
416+
s.mockShard.UpdateHandoverNamespace(namespaceEntry, false)
417+
_, handoverNS, err := s.mockShard.GetReplicationStatus([]string{})
418+
s.NoError(err)
419+
420+
handoverInfo, ok := handoverNS[namespaceEntry.Name().String()]
421+
s.True(ok)
422+
s.Equal(s.mockShard.immediateTaskExclusiveMaxReadLevel-1, handoverInfo.HandoverReplicationTaskId)
423+
424+
// make shard status invalid
425+
// ideally we should use s.mockShard.transition() method
426+
// but that will cause shard trying to re-acquire the shard in the background
427+
s.mockShard.stateLock.Lock()
428+
s.mockShard.state = contextStateAcquiring
429+
s.mockShard.stateLock.Unlock()
430+
431+
// note: no mock for NotifyNewTasks
432+
433+
s.mockShard.UpdateHandoverNamespace(namespaceEntry, false)
434+
_, handoverNS, err = s.mockShard.GetReplicationStatus([]string{})
435+
s.NoError(err)
436+
437+
handoverInfo, ok = handoverNS[namespaceEntry.Name().String()]
438+
s.True(ok)
439+
s.Equal(s.mockShard.immediateTaskExclusiveMaxReadLevel-1, handoverInfo.HandoverReplicationTaskId)
440+
441+
// delete namespace
442+
s.mockShard.UpdateHandoverNamespace(namespaceEntry, true)
443+
_, handoverNS, err = s.mockShard.GetReplicationStatus([]string{})
444+
s.NoError(err)
445+
446+
_, ok = handoverNS[namespaceEntry.Name().String()]
447+
s.False(ok)
448+
}

0 commit comments

Comments
 (0)