Check docs limit before indexing on primary (#63273)
Today indexing to a shard with 2147483519 documents will fail that shard. We should check the number of documents and reject the write requests instead. Closes #51136
This commit is contained in:
@ -0,0 +1,189 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexWriterMaxDocsChanger;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class MaxDocsLimitIT extends ESIntegTestCase {
private static final AtomicInteger maxDocs = new AtomicInteger();
public static class TestEnginePlugin extends Plugin implements EnginePlugin {
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> {
assert maxDocs.get() > 0 : "maxDocs is unset";
return EngineTestCase.createEngine(config, maxDocs.get());
protected boolean addMockInternalEngine() {
return false;
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
return plugins;
public void setMaxDocs() {
maxDocs.set(randomIntBetween(10, 100)); // Do not set this too low as we can fail to write the cluster state
public void restoreMaxDocs() {
public void testMaxDocsLimit() throws Exception {
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)));
IndexingResult indexingResult = indexDocs(maxDocs.get(), 1);
assertThat(indexingResult.numSuccess, equalTo(maxDocs.get()));
assertThat(indexingResult.numFailures, equalTo(0));
int rejectedRequests = between(1, 10);
indexingResult = indexDocs(rejectedRequests, between(1, 8));
assertThat(indexingResult.numFailures, equalTo(rejectedRequests));
assertThat(indexingResult.numSuccess, equalTo(0));
final IllegalArgumentException deleteError = expectThrows(IllegalArgumentException.class,
() -> client().prepareDelete("test", "_doc", "any-id").get());
assertThat(deleteError.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]"));
SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder())
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get()));
if (randomBoolean()) {
searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder())
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get()));
public void testMaxDocsLimitConcurrently() throws Exception {
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
IndexingResult indexingResult = indexDocs(between(maxDocs.get() + 1, maxDocs.get() * 2), between(2, 8));
assertThat(indexingResult.numFailures, greaterThan(0));
assertThat(indexingResult.numSuccess, both(greaterThan(0)).and(lessThanOrEqualTo(maxDocs.get())));
SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder())
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) indexingResult.numSuccess));
int totalSuccess = indexingResult.numSuccess;
while (totalSuccess < maxDocs.get()) {
indexingResult = indexDocs(between(1, 10), between(1, 8));
assertThat(indexingResult.numSuccess, greaterThan(0));
totalSuccess += indexingResult.numSuccess;
if (randomBoolean()) {
indexingResult = indexDocs(between(1, 10), between(1, 8));
assertThat(indexingResult.numSuccess, equalTo(0));
searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder())
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) totalSuccess));
static final class IndexingResult {
final int numSuccess;
final int numFailures;
IndexingResult(int numSuccess, int numFailures) {
this.numSuccess = numSuccess;
this.numFailures = numFailures;
static IndexingResult indexDocs(int numRequests, int numThreads) throws Exception {
final AtomicInteger completedRequests = new AtomicInteger();
final AtomicInteger numSuccess = new AtomicInteger();
final AtomicInteger numFailure = new AtomicInteger();
Thread[] indexers = new Thread[numThreads];
Phaser phaser = new Phaser(indexers.length);
for (int i = 0; i < indexers.length; i++) {
indexers[i] = new Thread(() -> {
while (completedRequests.incrementAndGet() <= numRequests) {
try {
final IndexResponse resp = client().prepareIndex("test", "_doc").setSource("{}", XContentType.JSON).get();
assertThat(resp.status(), equalTo(RestStatus.CREATED));
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]"));
for (Thread indexer : indexers) {
return new IndexingResult(numSuccess.get(), numFailure.get());
@ -180,6 +180,18 @@ public class InternalEngine extends Engine {
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
* If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted
* {@link IndexWriter#getPendingNumDocs()} yet, then IndexWriter can fail with too many documents. In this case, we have to fail
* the engine because we already generated sequence numbers for write operations; otherwise we will have gaps in sequence numbers.
* To avoid this, we keep track the number of documents that are being added to IndexWriter, and account it in
* {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)}. Although we can double count some inFlight documents in IW and Engine,
* this shouldn't be an issue because it happens for a short window and we adjust the inFlightDocCount once an indexing is completed.
private final AtomicLong inFlightDocCount = new AtomicLong();
private final int maxDocs;
private final String historyUUID;
@ -190,13 +202,12 @@ public class InternalEngine extends Engine {
private volatile String forceMergeUUID;
public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, LocalCheckpointTracker::new);
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
this.maxDocs = maxDocs;
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
@ -879,6 +890,7 @@ public class InternalEngine extends Engine {
try (ReleasableLock releasableLock = readLock.acquire()) {
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
int reservedDocs = 0;
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}) {
lastWriteNanos = index.startTime();
@ -909,9 +921,11 @@ public class InternalEngine extends Engine {
* or calls updateDocument.
final IndexingStrategy plan = indexingStrategyForOperation(index);
reservedDocs = plan.reservedDocs;
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
assert index.origin() == Operation.Origin.PRIMARY : index.origin();
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else {
@ -966,6 +980,8 @@ public class InternalEngine extends Engine {
indexResult.setTook(System.nanoTime() - index.startTime());
return indexResult;
} finally {
} catch (RuntimeException | IOException e) {
try {
@ -1004,14 +1020,14 @@ public class InternalEngine extends Engine {
} else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) {
// see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
plan = IndexingStrategy.optimizedAppendOnly(index.version());
plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0);
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version());
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
return plan;
@ -1028,11 +1044,17 @@ public class InternalEngine extends Engine {
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final int reservingDocs = index.parsedDoc().docs().size();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) {
plan = IndexingStrategy.optimizedAppendOnly(1L);
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else {
plan = IndexingStrategy.optimizedAppendOnly(1L, reservingDocs);
} else {
// resolves incoming version
@ -1064,9 +1086,14 @@ public class InternalEngine extends Engine {
new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version())
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()),
return plan;
@ -1178,53 +1205,55 @@ public class InternalEngine extends Engine {
final long versionForIndexing;
final boolean indexIntoLucene;
final boolean addStaleOpToLucene;
final int reservedDocs;
final Optional<IndexResult> earlyResultOnPreFlightError;
private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument,
boolean indexIntoLucene, boolean addStaleOpToLucene,
long versionForIndexing, IndexResult earlyResultOnPreFlightError) {
long versionForIndexing, int reservedDocs, IndexResult earlyResultOnPreFlightError) {
assert useLuceneUpdateDocument == false || indexIntoLucene :
"use lucene update is set to true, but we're not indexing into lucene";
assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false :
"can only index into lucene or have a preflight result but not both." +
"indexIntoLucene: " + indexIntoLucene
+ " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError;
assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs;
this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
this.useLuceneUpdateDocument = useLuceneUpdateDocument;
this.versionForIndexing = versionForIndexing;
this.indexIntoLucene = indexIntoLucene;
this.addStaleOpToLucene = addStaleOpToLucene;
this.reservedDocs = reservedDocs;
this.earlyResultOnPreFlightError =
earlyResultOnPreFlightError == null ? Optional.empty() :
static IndexingStrategy optimizedAppendOnly(long versionForIndexing) {
return new IndexingStrategy(true, false, true, false, versionForIndexing, null);
static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) {
return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null);
public static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) {
final IndexResult result = new IndexResult(e, currentVersion);
return new IndexingStrategy(
currentNotFoundOrDeleted, false, false, false,
Versions.NOT_FOUND, result);
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result);
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted,
long versionForIndexing) {
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) {
return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false,
true, false, versionForIndexing, null);
true, false, versionForIndexing, reservedDocs, null);
public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false,
false, versionForIndexing, null);
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null);
static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) {
return new IndexingStrategy(false, false, false,
addStaleOpToLucene, versionForIndexing, null);
return new IndexingStrategy(false, false, false, addStaleOpToLucene, versionForIndexing, 0, null);
static IndexingStrategy failAsTooManyDocs(Exception e) {
final IndexResult result = new IndexResult(e, Versions.NOT_FOUND);
return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result);
@ -1275,13 +1304,15 @@ public class InternalEngine extends Engine {
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
final DeleteResult deleteResult;
int reservedDocs = 0;
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) {
lastWriteNanos = delete.startTime();
final DeletionStrategy plan = deletionStrategyForOperation(delete);
reservedDocs = plan.reservedDocs;
if (plan.earlyResultOnPreflightError.isPresent()) {
assert delete.origin() == Operation.Origin.PRIMARY : delete.origin();
deleteResult = plan.earlyResultOnPreflightError.get();
} else {
// generate or register sequence number
@ -1323,11 +1354,36 @@ public class InternalEngine extends Engine {
throw e;
} finally {
return deleteResult;
private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) {
assert operation.origin() == Operation.Origin.PRIMARY : operation;
assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation;
assert addingDocs > 0 : addingDocs;
final long totalDocs = indexWriter.getPendingNumDocs() + inFlightDocCount.addAndGet(addingDocs);
if (totalDocs > maxDocs) {
return new IllegalArgumentException("Number of documents in the index can't exceed [" + maxDocs + "]");
} else {
return null;
private void releaseInFlightDocs(int numDocs) {
assert numDocs >= 0 : numDocs;
final long newValue = inFlightDocCount.addAndGet(-numDocs);
assert newValue >= 0 : "inFlightDocCount must not be negative [" + newValue + "]";
long getInFlightDocCount() {
return inFlightDocCount.get();
protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
if (delete.origin() == Operation.Origin.PRIMARY) {
return planDeletionAsPrimary(delete);
@ -1354,7 +1410,7 @@ public class InternalEngine extends Engine {
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version());
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0);
return plan;
@ -1394,7 +1450,13 @@ public class InternalEngine extends Engine {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version()));
final Exception reserveError = tryAcquireInFlightDocs(delete, 1);
if (reserveError != null) {
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
} else {
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
return plan;
@ -1454,9 +1516,10 @@ public class InternalEngine extends Engine {
final boolean currentlyDeleted;
final long versionOfDeletion;
final Optional<DeleteResult> earlyResultOnPreflightError;
final int reservedDocs;
private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted,
long versionOfDeletion, DeleteResult earlyResultOnPreflightError) {
long versionOfDeletion, int reservedDocs, DeleteResult earlyResultOnPreflightError) {
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
"can only delete from lucene or have a preflight result but not both." +
"deleteFromLucene: " + deleteFromLucene
@ -1465,6 +1528,8 @@ public class InternalEngine extends Engine {
this.addStaleOpToLucene = addStaleOpToLucene;
this.currentlyDeleted = currentlyDeleted;
this.versionOfDeletion = versionOfDeletion;
this.reservedDocs = reservedDocs;
assert reservedDocs == 0 || deleteFromLucene || addStaleOpToLucene : reservedDocs;
this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ?
Optional.empty() : Optional.of(earlyResultOnPreflightError);
@ -1473,20 +1538,26 @@ public class InternalEngine extends Engine {
VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult);
static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, null);
static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) {
return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null);
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null);
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null);
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, null);
return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, 0, null);
static DeletionStrategy failAsTooManyDocs(Exception e) {
final DeleteResult deleteResult = new DeleteResult(e, Versions.NOT_FOUND,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_SEQ_NO, false);
return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult);
@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterMaxDocsChanger;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
@ -4386,7 +4387,7 @@ public class InternalEngineTests extends EngineTestCase {
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD,
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
noOpEngine = new InternalEngine(noopEngineConfig, IndexWriter.MAX_DOCS, supplier) {
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
@ -6350,4 +6351,67 @@ public class InternalEngineTests extends EngineTestCase {
public void testMaxDocsOnPrimary() throws Exception {
final boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled();
int maxDocs = randomIntBetween(1, 100);
try {
engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new);
int numDocs = between(maxDocs + 1, maxDocs * 2);
List<Engine.Operation> operations = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
final String id;
if (softDeleteEnabled == false || randomBoolean()) {
id = Integer.toString(randomInt(numDocs));
operations.add(indexForDoc(createParsedDoc(id, null)));
} else {
id = "not_found";
operations.add(new Engine.Delete("_doc", id, newUid(id), primaryTerm.get()));
for (int i = 0; i < numDocs; i++) {
final long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
final Engine.Result result = applyOperation(engine, operations.get(i));
if (i < maxDocs) {
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo + 1L));
} else {
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE));
containsString("Number of documents in the index can't exceed [" + maxDocs + "]"));
assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
} finally {
public void testMaxDocsOnReplica() throws Exception {
assumeTrue("Deletes do not add documents to Lucene with soft-deletes disabled",
int maxDocs = randomIntBetween(1, 100);
try {
engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new);
int numDocs = between(maxDocs + 1, maxDocs * 2);
List<Engine.Operation> operations = generateHistoryOnReplica(numDocs, randomBoolean(), randomBoolean(), randomBoolean());
final IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> {
for (Engine.Operation op : operations) {
applyOperation(engine, op);
assertThat(error.getMessage(), containsString("number of documents in the index cannot exceed " + maxDocs));
} finally {
@ -270,12 +270,14 @@ public abstract class EngineTestCase extends ESTestCase {
if (engine != null && engine.isClosed.get() == false) {
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
@ -286,7 +288,6 @@ public abstract class EngineTestCase extends ESTestCase {
protected static ParseContext.Document testDocumentWithTextField() {
return testDocumentWithTextField("test");
@ -531,6 +532,10 @@ public abstract class EngineTestCase extends ESTestCase {
return internalEngine;
public static InternalEngine createEngine(EngineConfig engineConfig, int maxDocs) {
return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new);
public interface IndexWriterFactory {
@ -568,7 +573,7 @@ public abstract class EngineTestCase extends ESTestCase {
} else {
return new InternalTestEngine(config, localCheckpointTrackerSupplier) {
return new InternalTestEngine(config, IndexWriter.MAX_DOCS, localCheckpointTrackerSupplier) {
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
@ -1238,4 +1243,16 @@ public abstract class EngineTestCase extends ESTestCase {
public static long getNumVersionLookups(Engine engine) {
return ((InternalEngine) engine).getNumVersionLookups();
public static long getInFlightDocCount(Engine engine) {
if (engine instanceof InternalEngine) {
return ((InternalEngine) engine).getInFlightDocCount();
} else {
return 0;
public static void assertNoInFlightDocuments(Engine engine) throws Exception {
assertBusy(() -> assertThat(getInFlightDocCount(engine), equalTo(0L)));
@ -37,8 +37,9 @@ class InternalTestEngine extends InternalEngine {
InternalTestEngine(EngineConfig engineConfig, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig, localCheckpointTrackerSupplier);
InternalTestEngine(EngineConfig engineConfig, int maxDocs,
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig, maxDocs, localCheckpointTrackerSupplier);
@ -1457,6 +1457,24 @@ public final class InternalTestCluster extends TestCluster {
public void assertNoInFlightDocsInEngine() throws Exception {
assertBusy(() -> {
for (String nodeName : getNodeNames()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
try {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
assertThat(indexShard.routingEntry().toString(), EngineTestCase.getInFlightDocCount(engine), equalTo(0L));
} catch (AlreadyClosedException ignored) {
// shard is closed
private IndexShard getShardOrNull(ClusterState clusterState, ShardRouting shardRouting) {
if (shardRouting == null || shardRouting.assignedToNode() == false) {
return null;
@ -2523,9 +2541,10 @@ public final class InternalTestCluster extends TestCluster {
public synchronized void assertAfterTest() throws IOException {
public synchronized void assertAfterTest() throws Exception {
for (NodeAndClient nodeAndClient : nodes.values()) {
NodeEnvironment env = nodeAndClient.node().getNodeEnvironment();
Set<ShardId> shardIds = env.lockedShards();
@ -90,7 +90,7 @@ public abstract class TestCluster implements Closeable {
* This method checks all the things that need to be checked after each test
public void assertAfterTest() throws IOException {
public void assertAfterTest() throws Exception {
Reference in New Issue