From 7dcd81b41b668488c968004927832f13e3b52ad2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 5 Jul 2017 09:17:16 -0400 Subject: [PATCH] Throw back replica local checkpoint on new primary This commit causes a replica to throwback its local checkpoint to the global checkpoint when learning of a new primary through a replica operation. Relates #25452 --- .../index/seqno/LocalCheckpointTracker.java | 13 ++ .../index/seqno/SequenceNumbersService.java | 9 ++ .../elasticsearch/index/shard/IndexShard.java | 13 ++ .../seqno/LocalCheckpointTrackerTests.java | 20 +++ .../index/shard/IndexShardTests.java | 131 +++++++++++++++--- 5 files changed, 166 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index a1ad5dfa6ff..9af9f00b1d1 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -121,6 +121,19 @@ public class LocalCheckpointTracker { } } + /** + * Resets the checkpoint to the specified value. + * + * @param checkpoint the local checkpoint to reset this tracker to + */ + synchronized void resetCheckpoint(final long checkpoint) { + assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO; + assert checkpoint <= this.checkpoint; + processedSeqNo.clear(); + firstProcessedSeqNo = checkpoint + 1; + this.checkpoint = checkpoint; + } + /** * The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}. * diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 6d8b87599a1..05f5fea2dc2 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -106,6 +106,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent { localCheckpointTracker.markSeqNoAsCompleted(seqNo); } + /** + * Resets the local checkpoint to the specified value. + * + * @param localCheckpoint the local checkpoint to reset to + */ + public void resetLocalCheckpoint(final long localCheckpoint) { + localCheckpointTracker.resetCheckpoint(localCheckpoint); + } + /** * The current sequence number stats. * diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db0f27a28ca..021f37d4571 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2058,6 +2058,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl "shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]"; primaryTerm = operationPrimaryTerm; updateGlobalCheckpointOnReplica(globalCheckpoint); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long localCheckpoint; + if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + localCheckpoint = currentGlobalCheckpoint; + } + logger.trace( + "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", + operationPrimaryTerm, + getLocalCheckpoint(), + localCheckpoint); + getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint); getEngine().getTranslog().rollGeneration(); }); globalCheckpointUpdated = true; diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 3d280b4d28c..e2978ffc51d 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -236,4 +237,23 @@ public class LocalCheckpointTrackerTests extends ESTestCase { thread.join(); } + + public void testResetCheckpoint() { + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); + for (int i = 0; i < operations; i++) { + if (!rarely()) { + tracker.markSeqNoAsCompleted(i); + maxSeqNo = i; + } + } + + final int localCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); + tracker.resetCheckpoint(localCheckpoint); + assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); + assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); + assertThat(tracker.processedSeqNo, empty()); + assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9093274a491..a838681c901 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; @@ -142,7 +143,6 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -405,26 +405,10 @@ public class IndexShardTests extends IndexShardTestCase { // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); - boolean gap = false; - for (int i = 0; i < operations; i++) { - if (!rarely()) { - final String id = Integer.toString(i); - SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, - new BytesArray("{}"), XContentType.JSON); - indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), - 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, - getMappingUpdater(indexShard, sourceToParse.type())); - max = i; - } else { - gap = true; - } - } + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); - final int maxSeqNo = max; - if (gap) { - assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); - } + final int maxSeqNo = result.maxSeqNo; + final boolean gap = result.gap; // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); @@ -626,6 +610,12 @@ public class IndexShardTests extends IndexShardTestCase { } newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint); } + final long expectedLocalCheckpoint; + if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + } else { + expectedLocalCheckpoint = newGlobalCheckPoint; + } // but you can not increment with a new primary term until the operations on the older primary term complete final Thread thread = new Thread(() -> { try { @@ -637,6 +627,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); releasable.close(); @@ -697,6 +688,7 @@ public class IndexShardTests extends IndexShardTestCase { assertTrue(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1)); + assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); } } @@ -707,6 +699,56 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } + public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED)); + + final long globalCheckpointOnReplica = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica); + + final int globalCheckpoint = + randomIntBetween( + Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO), + Math.toIntExact(indexShard.getLocalCheckpoint())); + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquireReplicaOperationPermit( + indexShard.primaryTerm + 1, + globalCheckpoint, + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + + } + }, + ThreadPool.Names.SAME); + + latch.await(); + if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO + && globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); + } else { + assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica))); + } + + // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); + + closeShards(indexShard); + } + public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { final IndexShard indexShard = newStartedShard(false); @@ -1966,6 +2008,55 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(newShard); } + class Result { + private final int localCheckpoint; + private final int maxSeqNo; + private final boolean gap; + + Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) { + this.localCheckpoint = localCheckpoint; + this.maxSeqNo = maxSeqNo; + this.gap = gap; + } + } + + /** + * Index on the specified shard while introducing sequence number gaps. + * + * @param indexShard the shard + * @param operations the number of operations + * @param offset the starting sequence number + * @return a pair of the maximum sequence number and whether or not a gap was introduced + * @throws IOException if an I/O exception occurs while indexing on the shard + */ + private Result indexOnReplicaWithGaps( + final IndexShard indexShard, + final int operations, + final int offset) throws IOException { + int localCheckpoint = offset; + int max = offset; + boolean gap = false; + for (int i = offset + 1; i < operations; i++) { + if (!rarely()) { + final String id = Integer.toString(i); + SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id, + new BytesArray("{}"), XContentType.JSON); + indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(), + 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, + getMappingUpdater(indexShard, sourceToParse.type())); + if (!gap && i == localCheckpoint + 1) { + localCheckpoint++; + } + max = i; + } else { + gap = true; + } + } + assert localCheckpoint == indexShard.getLocalCheckpoint(); + assert !gap || (localCheckpoint != max); + return new Result(localCheckpoint, max, gap); + } + /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private final String indexName;