Skip to content

Commit 5614950

Browse files
wxing1292dnr
authored andcommitted
properly fix migration in mem ack aggregation (#4571)
* Fix migration in mem ack aggregation, since sender based / receiver based aggregation cannot be unified
1 parent 1ca8443 commit 5614950

File tree

2 files changed

+212
-9
lines changed

2 files changed

+212
-9
lines changed

service/history/shard/context_impl.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -383,16 +383,23 @@ func (s *ContextImpl) UpdateReplicationQueueReaderState(
383383
// UpdateRemoteClusterInfo deprecated
384384
// Deprecated use UpdateRemoteReaderInfo in the future instead
385385
func (s *ContextImpl) UpdateRemoteClusterInfo(
386-
cluster string,
386+
clusterName string,
387387
ackTaskID int64,
388388
ackTimestamp time.Time,
389389
) {
390390
s.wLock()
391391
defer s.wUnlock()
392392

393-
remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(cluster)
394-
remoteClusterInfo.AckedReplicationTaskIDs[s.shardID] = ackTaskID
395-
remoteClusterInfo.AckedReplicationTimestamps[s.shardID] = ackTimestamp
393+
clusterInfo := s.clusterMetadata.GetAllClusterInfo()
394+
remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(clusterName)
395+
for _, remoteShardID := range common.MapShardID(
396+
clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount,
397+
clusterInfo[clusterName].ShardCount,
398+
s.shardID,
399+
) {
400+
remoteClusterInfo.AckedReplicationTaskIDs[remoteShardID] = ackTaskID
401+
remoteClusterInfo.AckedReplicationTimestamps[remoteShardID] = ackTimestamp
402+
}
396403
}
397404

398405
// UpdateRemoteReaderInfo do not use streaming replication until remoteClusterInfo is updated to allow both
@@ -1716,10 +1723,27 @@ func (s *ContextImpl) GetReplicationStatus(clusterNames []string) (map[string]*h
17161723
continue
17171724
}
17181725

1719-
remoteShardID := s.shardID
1720-
remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{
1721-
AckedTaskId: v.AckedReplicationTaskIDs[remoteShardID],
1722-
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID]),
1726+
for _, remoteShardID := range common.MapShardID(
1727+
clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount,
1728+
clusterInfo[clusterName].ShardCount,
1729+
s.shardID,
1730+
) {
1731+
ackTaskID := v.AckedReplicationTaskIDs[remoteShardID] // default to 0
1732+
ackTimestamp := v.AckedReplicationTimestamps[remoteShardID]
1733+
if ackTimestamp.IsZero() {
1734+
ackTimestamp = time.Unix(0, 0)
1735+
}
1736+
if record, ok := remoteClusters[clusterName]; !ok {
1737+
remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{
1738+
AckedTaskId: ackTaskID,
1739+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
1740+
}
1741+
} else if record.AckedTaskId > ackTaskID {
1742+
remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{
1743+
AckedTaskId: ackTaskID,
1744+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
1745+
}
1746+
}
17231747
}
17241748
}
17251749

service/history/shard/context_test.go

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929
"errors"
3030
"fmt"
31+
"math/rand"
3132
"testing"
3233
"time"
3334

@@ -36,6 +37,7 @@ import (
3637
"github.com/stretchr/testify/suite"
3738
"go.temporal.io/api/enums/v1"
3839

40+
"go.temporal.io/server/api/historyservice/v1"
3941
persistencespb "go.temporal.io/server/api/persistence/v1"
4042
"go.temporal.io/server/common/backoff"
4143
"go.temporal.io/server/common/clock"
@@ -54,6 +56,7 @@ type (
5456
*require.Assertions
5557

5658
controller *gomock.Controller
59+
shardID int32
5760
mockShard *ContextTest
5861
mockClusterMetadata *cluster.MockMetadata
5962
mockShardManager *persistence.MockShardManager
@@ -75,11 +78,12 @@ func (s *contextSuite) SetupTest() {
7578

7679
s.controller = gomock.NewController(s.T())
7780

81+
s.shardID = 1
7882
s.timeSource = clock.NewEventTimeSource()
7983
shardContext := NewTestContextWithTimeSource(
8084
s.controller,
8185
&persistencespb.ShardInfo{
82-
ShardId: 0,
86+
ShardId: s.shardID,
8387
RangeId: 1,
8488
},
8589
tests.NewDynamicConfig(),
@@ -499,3 +503,178 @@ func (s *contextSuite) TestHandoverNamespace() {
499503
_, ok = handoverNS[namespaceEntry.Name().String()]
500504
s.False(ok)
501505
}
506+
507+
func (s *contextSuite) TestUpdateGetRemoteClusterInfo_Legacy_8_4() {
508+
clusterMetadata := cluster.NewMockMetadata(s.controller)
509+
clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
510+
clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
511+
clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{
512+
cluster.TestCurrentClusterName: {
513+
Enabled: true,
514+
InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
515+
RPCAddress: cluster.TestCurrentClusterFrontendAddress,
516+
ShardCount: 8,
517+
},
518+
cluster.TestAlternativeClusterName: {
519+
Enabled: true,
520+
InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion,
521+
RPCAddress: cluster.TestAlternativeClusterFrontendAddress,
522+
ShardCount: 4,
523+
},
524+
}).AnyTimes()
525+
s.mockShard.clusterMetadata = clusterMetadata
526+
527+
ackTaskID := rand.Int63()
528+
ackTimestamp := time.Unix(0, rand.Int63())
529+
s.mockShard.UpdateRemoteClusterInfo(
530+
cluster.TestAlternativeClusterName,
531+
ackTaskID,
532+
ackTimestamp,
533+
)
534+
remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName})
535+
s.NoError(err)
536+
s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{
537+
cluster.TestAlternativeClusterName: {
538+
AckedTaskId: ackTaskID,
539+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
540+
},
541+
}, remoteAckStatus)
542+
}
543+
544+
func (s *contextSuite) TestUpdateGetRemoteClusterInfo_Legacy_4_8() {
545+
clusterMetadata := cluster.NewMockMetadata(s.controller)
546+
clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
547+
clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
548+
clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{
549+
cluster.TestCurrentClusterName: {
550+
Enabled: true,
551+
InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
552+
RPCAddress: cluster.TestCurrentClusterFrontendAddress,
553+
ShardCount: 4,
554+
},
555+
cluster.TestAlternativeClusterName: {
556+
Enabled: true,
557+
InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion,
558+
RPCAddress: cluster.TestAlternativeClusterFrontendAddress,
559+
ShardCount: 8,
560+
},
561+
}).AnyTimes()
562+
s.mockShard.clusterMetadata = clusterMetadata
563+
564+
ackTaskID := rand.Int63()
565+
ackTimestamp := time.Unix(0, rand.Int63())
566+
s.mockShard.UpdateRemoteClusterInfo(
567+
cluster.TestAlternativeClusterName,
568+
ackTaskID,
569+
ackTimestamp,
570+
)
571+
remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName})
572+
s.NoError(err)
573+
s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{
574+
cluster.TestAlternativeClusterName: {
575+
AckedTaskId: ackTaskID,
576+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
577+
},
578+
}, remoteAckStatus)
579+
}
580+
581+
func (s *contextSuite) TestUpdateGetRemoteReaderInfo_8_4() {
582+
clusterMetadata := cluster.NewMockMetadata(s.controller)
583+
clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
584+
clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
585+
clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{
586+
cluster.TestCurrentClusterName: {
587+
Enabled: true,
588+
InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
589+
RPCAddress: cluster.TestCurrentClusterFrontendAddress,
590+
ShardCount: 8,
591+
},
592+
cluster.TestAlternativeClusterName: {
593+
Enabled: true,
594+
InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion,
595+
RPCAddress: cluster.TestAlternativeClusterFrontendAddress,
596+
ShardCount: 4,
597+
},
598+
}).AnyTimes()
599+
s.mockShard.clusterMetadata = clusterMetadata
600+
601+
ackTaskID := rand.Int63()
602+
ackTimestamp := time.Unix(0, rand.Int63())
603+
err := s.mockShard.UpdateRemoteReaderInfo(
604+
ReplicationReaderIDFromClusterShardID(
605+
cluster.TestAlternativeClusterInitialFailoverVersion,
606+
1,
607+
),
608+
ackTaskID,
609+
ackTimestamp,
610+
)
611+
s.NoError(err)
612+
remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName})
613+
s.NoError(err)
614+
s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{
615+
cluster.TestAlternativeClusterName: {
616+
AckedTaskId: ackTaskID,
617+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
618+
},
619+
}, remoteAckStatus)
620+
}
621+
622+
func (s *contextSuite) TestUpdateGetRemoteReaderInfo_4_8() {
623+
clusterMetadata := cluster.NewMockMetadata(s.controller)
624+
clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes()
625+
clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
626+
clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{
627+
cluster.TestCurrentClusterName: {
628+
Enabled: true,
629+
InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion,
630+
RPCAddress: cluster.TestCurrentClusterFrontendAddress,
631+
ShardCount: 4,
632+
},
633+
cluster.TestAlternativeClusterName: {
634+
Enabled: true,
635+
InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion,
636+
RPCAddress: cluster.TestAlternativeClusterFrontendAddress,
637+
ShardCount: 8,
638+
},
639+
}).AnyTimes()
640+
s.mockShard.clusterMetadata = clusterMetadata
641+
642+
ack1TaskID := rand.Int63()
643+
ack1Timestamp := time.Unix(0, rand.Int63())
644+
err := s.mockShard.UpdateRemoteReaderInfo(
645+
ReplicationReaderIDFromClusterShardID(
646+
cluster.TestAlternativeClusterInitialFailoverVersion,
647+
1, // maps to local shard 1
648+
),
649+
ack1TaskID,
650+
ack1Timestamp,
651+
)
652+
s.NoError(err)
653+
ack5TaskID := rand.Int63()
654+
ack5Timestamp := time.Unix(0, rand.Int63())
655+
err = s.mockShard.UpdateRemoteReaderInfo(
656+
ReplicationReaderIDFromClusterShardID(
657+
cluster.TestAlternativeClusterInitialFailoverVersion,
658+
5, // maps to local shard 1
659+
),
660+
ack5TaskID,
661+
ack5Timestamp,
662+
)
663+
s.NoError(err)
664+
665+
ackTaskID := ack1TaskID
666+
ackTimestamp := ack1Timestamp
667+
if ackTaskID > ack5TaskID {
668+
ackTaskID = ack5TaskID
669+
ackTimestamp = ack5Timestamp
670+
}
671+
672+
remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName})
673+
s.NoError(err)
674+
s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{
675+
cluster.TestAlternativeClusterName: {
676+
AckedTaskId: ackTaskID,
677+
AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp),
678+
},
679+
}, remoteAckStatus)
680+
}

0 commit comments

Comments
 (0)