diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a847088869b..4ebce1c0b4b 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -473,7 +473,7 @@ public class RecoverySourceHandler { /* * finalizes the recovery process */ - public void finalizeRecovery(final long targetLocalCheckpoint) { + public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -488,7 +488,7 @@ public class RecoverySourceHandler { */ runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)); final long globalCheckpoint = shard.getGlobalCheckpoint(); - cancellableThreads.execute(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); + cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint)); if (request.isPrimaryRelocation()) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 973c09635a3..41b3a829881 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -367,9 +367,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public void finalizeRecovery(final long globalCheckpoint) { - indexShard().updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); + public void finalizeRecovery(final long globalCheckpoint) throws IOException { final IndexShard indexShard = indexShard(); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); + // Persist the global checkpoint. + indexShard.sync(); indexShard.finalizeRecovery(); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 804c4f50484..e7403986dc2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -43,7 +43,7 @@ public interface RecoveryTargetHandler { * * @param globalCheckpoint the global checkpoint on the recovery source */ - void finalizeRecovery(long globalCheckpoint); + void finalizeRecovery(long globalCheckpoint) throws IOException; /** * Blockingly waits for cluster state with at least clusterStateVersion to be available diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 8f18ccd8c9a..2bf7de6b94a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -580,7 +580,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC } @Override - public void finalizeRecovery(long globalCheckpoint) { + public void finalizeRecovery(long globalCheckpoint) throws IOException { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4618d84d4ad..e316de748db 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2184,7 +2184,7 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void finalizeRecovery(long globalCheckpoint) { + public void finalizeRecovery(long globalCheckpoint) throws IOException { super.finalizeRecovery(globalCheckpoint); assertListenerCalled.accept(replica); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 16852502e8c..f691cfd0238 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -75,7 +75,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { final Translog translog = replica.getTranslog(); final String translogUUID = translog.getTranslogUUID(); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1)); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 3a5a4a3807a..a89237160aa 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -229,4 +229,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { shards.assertAllEqual(numDocs); } } + + public void testPeerRecoveryPersistGlobalCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startPrimary(); + final long numDocs = shards.indexDocs(between(1, 100)); + if (randomBoolean()) { + shards.flush(); + } + final IndexShard replica = shards.addReplica(); + shards.recoverReplica(replica); + assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); + } + } }