Skip to content

Commit dde5c89

Browse files
wxing1292yux0
authored andcommitted
Fix migration in mem ack aggregation (#4569)
<!-- Describe what has changed in this PR --> **What changed?** * Fix migration in mem ack aggregation, since sender based / receiver based aggregation cannot be unified <!-- Tell your future self why have you made these changes --> **Why?** Bugfix <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** N/A <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** N/A <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** N/A
1 parent 0fcff51 commit dde5c89

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

service/history/shard/context_impl.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ func (s *ContextImpl) UpdateRemoteClusterInfo(
395395
remoteClusterInfo.AckedReplicationTimestamps[s.shardID] = ackTimestamp
396396
}
397397

398+
// UpdateRemoteReaderInfo do not use streaming replication until remoteClusterInfo is updated to allow both
399+
// streaming & pull based replication
398400
func (s *ContextImpl) UpdateRemoteReaderInfo(
399401
readerID int64,
400402
ackTaskID int64,
@@ -1714,22 +1716,10 @@ func (s *ContextImpl) GetReplicationStatus(clusterNames []string) (map[string]*h
17141716
continue
17151717
}
17161718

1717-
var taskID *int64
1718-
var ackTime *time.Time
1719-
for _, remoteShardID := range common.MapShardID(
1720-
clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount,
1721-
clusterInfo[clusterName].ShardCount,
1722-
s.shardID,
1723-
) {
1724-
if taskID == nil || v.AckedReplicationTaskIDs[remoteShardID] < *taskID {
1725-
taskID = convert.Int64Ptr(v.AckedReplicationTaskIDs[remoteShardID])
1726-
ackTime = timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID])
1727-
}
1728-
}
1729-
1719+
remoteShardID := s.shardID
17301720
remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{
1731-
AckedTaskId: *taskID,
1732-
AckedTaskVisibilityTime: ackTime,
1721+
AckedTaskId: v.AckedReplicationTaskIDs[remoteShardID],
1722+
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID]),
17331723
}
17341724
}
17351725

0 commit comments

Comments
 (0)