diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6a790261252..13abf553785 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -369,7 +369,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, - latch::await, + () -> { + latch.await(); + getEngine().fillSeqNoGaps(newPrimaryTerm); + }, e -> failShard("exception during primary term transition", e)); primaryTerm = newPrimaryTerm; latch.countDown(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e7aa3c61b4e..38cac70b5e3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -131,7 +131,9 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.VersionType.EXTERNAL; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; @@ -141,6 +143,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -388,6 +391,70 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } + public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); + boolean gap = false; + for (int i = 0; i < operations; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); + if (!rarely()) { + final Term uid = new Term("_id", doc.id()); + final Engine.Index index = + new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false); + indexShard.index(index); + max = i; + } else { + gap = true; + } + } + + final int maxSeqNo = max; + if (gap) { + assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); + } + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + /* + * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the + * gaps are filled. + */ + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC); + + latch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1172,7 +1239,7 @@ public class IndexShardTests extends IndexShardTestCase { test = otherShard.prepareIndexOnReplica( SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), XContentType.JSON), - 1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + 1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); otherShard.index(test); final ShardRouting primaryShardRouting = shard.routingEntry();