Persist global checkpoint when finalizing peer recovery (#27947)

Today we don't persist the global checkpoint when finishing a peer 
recovery even though we advance an in memory value. This commit persists
the global checkpoint in RecoveryTarget#finalizeRecovery.

Closes #27861
This commit is contained in:
Nhat Nguyen 2017-12-21 16:51:30 -05:00 committed by GitHub
parent e5f0852d5f
commit c831442352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 23 additions and 8 deletions

View File

@ -473,7 +473,7 @@ public class RecoverySourceHandler {
/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) {
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -488,7 +488,7 @@ public class RecoverySourceHandler {
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.execute(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint));
if (request.isPrimaryRelocation()) {

View File

@ -367,9 +367,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
@Override
public void finalizeRecovery(final long globalCheckpoint) {
indexShard().updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
}

View File

@ -43,7 +43,7 @@ public interface RecoveryTargetHandler {
*
* @param globalCheckpoint the global checkpoint on the recovery source
*/
void finalizeRecovery(long globalCheckpoint);
void finalizeRecovery(long globalCheckpoint) throws IOException;
/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available

View File

@ -580,7 +580,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
@Override
public void finalizeRecovery(long globalCheckpoint) {
public void finalizeRecovery(long globalCheckpoint) throws IOException {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);

View File

@ -2184,7 +2184,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void finalizeRecovery(long globalCheckpoint) {
public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
}

View File

@ -75,7 +75,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
final Translog translog = replica.getTranslog();
final String translogUUID = translog.getTranslogUUID();
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1));

View File

@ -229,4 +229,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.assertAllEqual(numDocs);
}
}
public void testPeerRecoveryPersistGlobalCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
final long numDocs = shards.indexDocs(between(1, 100));
if (randomBoolean()) {
shards.flush();
}
final IndexShard replica = shards.addReplica();
shards.recoverReplica(replica);
assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1));
}
}
}