Skip to content

Commit 46a0045

Browse files
authored
Fix checkpoint handling to prevent segment replication infinite loop (#18636)
* Fix checkpoint handling to prevent segment replication infinite loop Signed-off-by: Ashish Singh <[email protected]> * Modify existing test to verify code change behaviour Signed-off-by: Ashish Singh <[email protected]> --------- Signed-off-by: Ashish Singh <[email protected]>
1 parent 36c2d59 commit 46a0045

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import java.io.IOException;
5151
import java.util.List;
52+
import java.util.Objects;
5253
import java.util.Optional;
5354
import java.util.concurrent.CountDownLatch;
5455
import java.util.concurrent.atomic.AtomicLong;
@@ -347,7 +348,10 @@ public void onReplicationDone(SegmentReplicationState state) {
347348

348349
// if we received a checkpoint during the copy event that is ahead of this
349350
// try and process it.
350-
processLatestReceivedCheckpoint(replicaShard, thread);
351+
ReplicationCheckpoint latestReceivedCheckpoint = replicator.getPrimaryCheckpoint(replicaShard.shardId());
352+
if (Objects.nonNull(latestReceivedCheckpoint) && latestReceivedCheckpoint.isAheadOf(receivedCheckpoint)) {
353+
processLatestReceivedCheckpoint(replicaShard, thread);
354+
}
351355
}
352356

353357
@Override

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ public void testStartReplicationListenerSuccess() throws InterruptedException {
625625

626626
latch.await(2, TimeUnit.SECONDS);
627627
verify(spy, (atLeastOnce())).updateVisibleCheckpoint(eq(0L), eq(replicaShard));
628+
verify(spy, times(1)).processLatestReceivedCheckpoint(any(), any());
628629
}
629630

630631
public void testStartReplicationListenerFailure() throws InterruptedException {
@@ -851,4 +852,5 @@ public void testProcessCheckpointOnClusterStateUpdate() {
851852
spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState));
852853
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
853854
}
855+
854856
}

0 commit comments

Comments
 (0)