From 9015b50e1b87c3726985b5bb4ca9c90a62062163 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 13 Oct 2020 17:39:08 -0400 Subject: [PATCH] Check docs limit before indexing on primary (#63273) Today indexing to a shard with 2147483519 documents will fail that shard. We should check the number of documents and reject the write requests instead. Closes #51136 --- .../index/engine/MaxDocsLimitIT.java | 189 ++++++++++++++++++ .../index/engine/InternalEngine.java | 135 ++++++++++--- .../index/engine/InternalEngineTests.java | 66 +++++- .../index/engine/EngineTestCase.java | 21 +- .../index/engine/InternalTestEngine.java | 5 +- .../test/InternalTestCluster.java | 21 +- .../org/elasticsearch/test/TestCluster.java | 2 +- 7 files changed, 400 insertions(+), 39 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java new file mode 100644 index 00000000000..aece4c4ab27 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MaxDocsLimitIT.java @@ -0,0 +1,189 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexWriterMaxDocsChanger; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class MaxDocsLimitIT extends ESIntegTestCase { + + private static final AtomicInteger maxDocs = new AtomicInteger(); + + public static class TestEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> { + assert maxDocs.get() > 0 : "maxDocs is unset"; + return EngineTestCase.createEngine(config, maxDocs.get()); + }); + } + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestEnginePlugin.class); + return plugins; + } + + @Before + public void setMaxDocs() { + maxDocs.set(randomIntBetween(10, 100)); // Do not set this too low as we can fail to write the cluster state + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs.get()); + } + + @After + public void restoreMaxDocs() { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + + public void testMaxDocsLimit() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST))); + IndexingResult indexingResult = indexDocs(maxDocs.get(), 1); + assertThat(indexingResult.numSuccess, equalTo(maxDocs.get())); + assertThat(indexingResult.numFailures, equalTo(0)); + int rejectedRequests = between(1, 10); + indexingResult = indexDocs(rejectedRequests, between(1, 8)); + assertThat(indexingResult.numFailures, equalTo(rejectedRequests)); + assertThat(indexingResult.numSuccess, equalTo(0)); + final IllegalArgumentException deleteError = expectThrows(IllegalArgumentException.class, + () -> client().prepareDelete("test", "_doc", "any-id").get()); + assertThat(deleteError.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]")); + client().admin().indices().prepareRefresh("test").get(); + SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get())); + if (randomBoolean()) { + client().admin().indices().prepareFlush("test").get(); + } + internalCluster().fullRestart(); + internalCluster().ensureAtLeastNumDataNodes(2); + ensureGreen("test"); + searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) maxDocs.get())); + } + + public void testMaxDocsLimitConcurrently() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))); + IndexingResult indexingResult = indexDocs(between(maxDocs.get() + 1, maxDocs.get() * 2), between(2, 8)); + assertThat(indexingResult.numFailures, greaterThan(0)); + assertThat(indexingResult.numSuccess, both(greaterThan(0)).and(lessThanOrEqualTo(maxDocs.get()))); + client().admin().indices().prepareRefresh("test").get(); + SearchResponse searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) indexingResult.numSuccess)); + int totalSuccess = indexingResult.numSuccess; + while (totalSuccess < maxDocs.get()) { + indexingResult = indexDocs(between(1, 10), between(1, 8)); + assertThat(indexingResult.numSuccess, greaterThan(0)); + totalSuccess += indexingResult.numSuccess; + } + if (randomBoolean()) { + indexingResult = indexDocs(between(1, 10), between(1, 8)); + assertThat(indexingResult.numSuccess, equalTo(0)); + } + client().admin().indices().prepareRefresh("test").get(); + searchResponse = client().prepareSearch("test").setQuery(new MatchAllQueryBuilder()) + .setTrackTotalHitsUpTo(Integer.MAX_VALUE).setSize(0).get(); + ElasticsearchAssertions.assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) totalSuccess)); + } + + static final class IndexingResult { + final int numSuccess; + final int numFailures; + + IndexingResult(int numSuccess, int numFailures) { + this.numSuccess = numSuccess; + this.numFailures = numFailures; + } + } + + static IndexingResult indexDocs(int numRequests, int numThreads) throws Exception { + final AtomicInteger completedRequests = new AtomicInteger(); + final AtomicInteger numSuccess = new AtomicInteger(); + final AtomicInteger numFailure = new AtomicInteger(); + Thread[] indexers = new Thread[numThreads]; + Phaser phaser = new Phaser(indexers.length); + for (int i = 0; i < indexers.length; i++) { + indexers[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + while (completedRequests.incrementAndGet() <= numRequests) { + try { + final IndexResponse resp = client().prepareIndex("test", "_doc").setSource("{}", XContentType.JSON).get(); + numSuccess.incrementAndGet(); + assertThat(resp.status(), equalTo(RestStatus.CREATED)); + } catch (IllegalArgumentException e) { + numFailure.incrementAndGet(); + assertThat(e.getMessage(), containsString("Number of documents in the index can't exceed [" + maxDocs.get() + "]")); + } + } + }); + indexers[i].start(); + } + for (Thread indexer : indexers) { + indexer.join(); + } + internalCluster().assertNoInFlightDocsInEngine(); + return new IndexingResult(numSuccess.get(), numFailure.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e2c3944245..d52335a517f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -180,6 +180,18 @@ public class InternalEngine extends Engine { private final KeyedLock noOpKeyedLock = new KeyedLock<>(); private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); + /** + * If multiple writes passed {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)} but they haven't adjusted + * {@link IndexWriter#getPendingNumDocs()} yet, then IndexWriter can fail with too many documents. In this case, we have to fail + * the engine because we already generated sequence numbers for write operations; otherwise we will have gaps in sequence numbers. + * To avoid this, we keep track the number of documents that are being added to IndexWriter, and account it in + * {@link InternalEngine#tryAcquireInFlightDocs(Operation, int)}. Although we can double count some inFlight documents in IW and Engine, + * this shouldn't be an issue because it happens for a short window and we adjust the inFlightDocCount once an indexing is completed. + */ + private final AtomicLong inFlightDocCount = new AtomicLong(); + + private final int maxDocs; + @Nullable private final String historyUUID; @@ -190,13 +202,12 @@ public class InternalEngine extends Engine { private volatile String forceMergeUUID; public InternalEngine(EngineConfig engineConfig) { - this(engineConfig, LocalCheckpointTracker::new); + this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } - InternalEngine( - final EngineConfig engineConfig, - final BiFunction localCheckpointTrackerSupplier) { + InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) { super(engineConfig); + this.maxDocs = maxDocs; if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { updateAutoIdTimestamp(Long.MAX_VALUE, true); } @@ -879,6 +890,7 @@ public class InternalEngine extends Engine { try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); + int reservedDocs = 0; try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}) { lastWriteNanos = index.startTime(); @@ -909,9 +921,11 @@ public class InternalEngine extends Engine { * or calls updateDocument. */ final IndexingStrategy plan = indexingStrategyForOperation(index); + reservedDocs = plan.reservedDocs; final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { + assert index.origin() == Operation.Origin.PRIMARY : index.origin(); indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); } else { @@ -966,6 +980,8 @@ public class InternalEngine extends Engine { indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); return indexResult; + } finally { + releaseInFlightDocs(reservedDocs); } } catch (RuntimeException | IOException e) { try { @@ -1004,14 +1020,14 @@ public class InternalEngine extends Engine { } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; - plan = IndexingStrategy.optimizedAppendOnly(index.version()); + plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); } else { versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } } return plan; @@ -1028,11 +1044,17 @@ public class InternalEngine extends Engine { private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); + final int reservingDocs = index.parsedDoc().docs().size(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) { - plan = IndexingStrategy.optimizedAppendOnly(1L); + final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else { + plan = IndexingStrategy.optimizedAppendOnly(1L, reservingDocs); + } } else { versionMap.enforceSafeAccess(); // resolves incoming version @@ -1064,9 +1086,14 @@ public class InternalEngine extends Engine { new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted); plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { - plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()) - ); + final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); + if (reserveError != null) { + plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else { + plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, + canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()), + reservingDocs); + } } } return plan; @@ -1178,53 +1205,55 @@ public class InternalEngine extends Engine { final long versionForIndexing; final boolean indexIntoLucene; final boolean addStaleOpToLucene; + final int reservedDocs; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, boolean indexIntoLucene, boolean addStaleOpToLucene, - long versionForIndexing, IndexResult earlyResultOnPreFlightError) { + long versionForIndexing, int reservedDocs, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false : "can only index into lucene or have a preflight result but not both." + "indexIntoLucene: " + indexIntoLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreFlightError; + assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs; this.currentNotFoundOrDeleted = currentNotFoundOrDeleted; this.useLuceneUpdateDocument = useLuceneUpdateDocument; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; this.addStaleOpToLucene = addStaleOpToLucene; + this.reservedDocs = reservedDocs; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } - static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { - return new IndexingStrategy(true, false, true, false, versionForIndexing, null); + static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null); } public static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); - return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, false, - Versions.NOT_FOUND, result); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result); } - static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, - long versionForIndexing) { + static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, false, versionForIndexing, null); + true, false, versionForIndexing, reservedDocs, null); } public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, false, - false, versionForIndexing, null); + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null); } static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) { - return new IndexingStrategy(false, false, false, - addStaleOpToLucene, versionForIndexing, null); + return new IndexingStrategy(false, false, false, addStaleOpToLucene, versionForIndexing, 0, null); + } + static IndexingStrategy failAsTooManyDocs(Exception e) { + final IndexResult result = new IndexResult(e, Versions.NOT_FOUND); + return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); } } @@ -1275,13 +1304,15 @@ public class InternalEngine extends Engine { assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; + int reservedDocs = 0; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) { ensureOpen(); lastWriteNanos = delete.startTime(); final DeletionStrategy plan = deletionStrategyForOperation(delete); - + reservedDocs = plan.reservedDocs; if (plan.earlyResultOnPreflightError.isPresent()) { + assert delete.origin() == Operation.Origin.PRIMARY : delete.origin(); deleteResult = plan.earlyResultOnPreflightError.get(); } else { // generate or register sequence number @@ -1323,11 +1354,36 @@ public class InternalEngine extends Engine { e.addSuppressed(inner); } throw e; + } finally { + releaseInFlightDocs(reservedDocs); } maybePruneDeletes(); return deleteResult; } + private Exception tryAcquireInFlightDocs(Operation operation, int addingDocs) { + assert operation.origin() == Operation.Origin.PRIMARY : operation; + assert operation.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : operation; + assert addingDocs > 0 : addingDocs; + final long totalDocs = indexWriter.getPendingNumDocs() + inFlightDocCount.addAndGet(addingDocs); + if (totalDocs > maxDocs) { + releaseInFlightDocs(addingDocs); + return new IllegalArgumentException("Number of documents in the index can't exceed [" + maxDocs + "]"); + } else { + return null; + } + } + + private void releaseInFlightDocs(int numDocs) { + assert numDocs >= 0 : numDocs; + final long newValue = inFlightDocCount.addAndGet(-numDocs); + assert newValue >= 0 : "inFlightDocCount must not be negative [" + newValue + "]"; + } + + long getInFlightDocCount() { + return inFlightDocCount.get(); + } + protected DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { if (delete.origin() == Operation.Origin.PRIMARY) { return planDeletionAsPrimary(delete); @@ -1354,7 +1410,7 @@ public class InternalEngine extends Engine { if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version()); } else { - plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version()); + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0); } } return plan; @@ -1394,7 +1450,13 @@ public class InternalEngine extends Engine { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally(currentlyDeleted, delete.versionType().updateVersion(currentVersion, delete.version())); + final Exception reserveError = tryAcquireInFlightDocs(delete, 1); + if (reserveError != null) { + plan = DeletionStrategy.failAsTooManyDocs(reserveError); + } else { + final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version()); + plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1); + } } return plan; } @@ -1454,9 +1516,10 @@ public class InternalEngine extends Engine { final boolean currentlyDeleted; final long versionOfDeletion; final Optional earlyResultOnPreflightError; + final int reservedDocs; private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, - long versionOfDeletion, DeleteResult earlyResultOnPreflightError) { + long versionOfDeletion, int reservedDocs, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene @@ -1465,6 +1528,8 @@ public class InternalEngine extends Engine { this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.versionOfDeletion = versionOfDeletion; + this.reservedDocs = reservedDocs; + assert reservedDocs == 0 || deleteFromLucene || addStaleOpToLucene : reservedDocs; this.earlyResultOnPreflightError = earlyResultOnPreflightError == null ? Optional.empty() : Optional.of(earlyResultOnPreflightError); } @@ -1473,20 +1538,26 @@ public class InternalEngine extends Engine { VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final DeleteResult deleteResult = new DeleteResult(e, currentVersion, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_SEQ_NO, currentlyDeleted == false); - return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult); } - static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion) { - return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, null); + static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) { + return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null); } public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) { - return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, null); + return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null); } static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) { - return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, null); + return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, 0, null); + } + + static DeletionStrategy failAsTooManyDocs(Exception e) { + final DeleteResult deleteResult = new DeleteResult(e, Versions.NOT_FOUND, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_SEQ_NO, false); + return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 169ec24caf8..bacfa1d2d85 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterMaxDocsChanger; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; @@ -4386,7 +4387,7 @@ public class InternalEngineTests extends EngineTestCase { localCheckpoint); EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy())); - noOpEngine = new InternalEngine(noopEngineConfig, supplier) { + noOpEngine = new InternalEngine(noopEngineConfig, IndexWriter.MAX_DOCS, supplier) { @Override protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); @@ -6350,4 +6351,67 @@ public class InternalEngineTests extends EngineTestCase { } } } + + public void testMaxDocsOnPrimary() throws Exception { + engine.close(); + final boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled(); + int maxDocs = randomIntBetween(1, 100); + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); + try { + engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new); + int numDocs = between(maxDocs + 1, maxDocs * 2); + List operations = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + final String id; + if (softDeleteEnabled == false || randomBoolean()) { + id = Integer.toString(randomInt(numDocs)); + operations.add(indexForDoc(createParsedDoc(id, null))); + } else { + id = "not_found"; + operations.add(new Engine.Delete("_doc", id, newUid(id), primaryTerm.get())); + } + } + for (int i = 0; i < numDocs; i++) { + final long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + final Engine.Result result = applyOperation(engine, operations.get(i)); + if (i < maxDocs) { + assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + assertNull(result.getFailure()); + assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo + 1L)); + } else { + assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); + assertNotNull(result.getFailure()); + assertThat(result.getFailure().getMessage(), + containsString("Number of documents in the index can't exceed [" + maxDocs + "]")); + assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); + } + assertFalse(engine.isClosed.get()); + } + } finally { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + } + + public void testMaxDocsOnReplica() throws Exception { + assumeTrue("Deletes do not add documents to Lucene with soft-deletes disabled", + engine.config().getIndexSettings().isSoftDeleteEnabled()); + engine.close(); + int maxDocs = randomIntBetween(1, 100); + IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); + try { + engine = new InternalTestEngine(engine.config(), maxDocs, LocalCheckpointTracker::new); + int numDocs = between(maxDocs + 1, maxDocs * 2); + List operations = generateHistoryOnReplica(numDocs, randomBoolean(), randomBoolean(), randomBoolean()); + final IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + } + }); + assertThat(error.getMessage(), containsString("number of documents in the index cannot exceed " + maxDocs)); + assertTrue(engine.isClosed.get()); + } finally { + IndexWriterMaxDocsChanger.restoreMaxDocs(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 868ca1ccdcd..33aa6bd09a1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -270,12 +270,14 @@ public abstract class EngineTestCase extends ESTestCase { if (engine != null && engine.isClosed.get() == false) { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); + assertNoInFlightDocuments(engine); assertMaxSeqNoInCommitUserData(engine); assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + assertNoInFlightDocuments(replicaEngine); assertMaxSeqNoInCommitUserData(replicaEngine); assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); } @@ -286,7 +288,6 @@ public abstract class EngineTestCase extends ESTestCase { } } - protected static ParseContext.Document testDocumentWithTextField() { return testDocumentWithTextField("test"); } @@ -531,6 +532,10 @@ public abstract class EngineTestCase extends ESTestCase { return internalEngine; } + public static InternalEngine createEngine(EngineConfig engineConfig, int maxDocs) { + return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new); + } + @FunctionalInterface public interface IndexWriterFactory { @@ -568,7 +573,7 @@ public abstract class EngineTestCase extends ESTestCase { } }; } else { - return new InternalTestEngine(config, localCheckpointTrackerSupplier) { + return new InternalTestEngine(config, IndexWriter.MAX_DOCS, localCheckpointTrackerSupplier) { @Override IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { return (indexWriterFactory != null) ? @@ -1238,4 +1243,16 @@ public abstract class EngineTestCase extends ESTestCase { public static long getNumVersionLookups(Engine engine) { return ((InternalEngine) engine).getNumVersionLookups(); } + + public static long getInFlightDocCount(Engine engine) { + if (engine instanceof InternalEngine) { + return ((InternalEngine) engine).getInFlightDocCount(); + } else { + return 0; + } + } + + public static void assertNoInFlightDocuments(Engine engine) throws Exception { + assertBusy(() -> assertThat(getInFlightDocCount(engine), equalTo(0L))); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java index 8c52d57aabc..d31fe609e62 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/InternalTestEngine.java @@ -37,8 +37,9 @@ class InternalTestEngine extends InternalEngine { super(engineConfig); } - InternalTestEngine(EngineConfig engineConfig, BiFunction localCheckpointTrackerSupplier) { - super(engineConfig, localCheckpointTrackerSupplier); + InternalTestEngine(EngineConfig engineConfig, int maxDocs, + BiFunction localCheckpointTrackerSupplier) { + super(engineConfig, maxDocs, localCheckpointTrackerSupplier); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 3818908fae3..36586e0c7a1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1457,6 +1457,24 @@ public final class InternalTestCluster extends TestCluster { } } + public void assertNoInFlightDocsInEngine() throws Exception { + assertBusy(() -> { + for (String nodeName : getNodeNames()) { + IndicesService indexServices = getInstance(IndicesService.class, nodeName); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + try { + final Engine engine = IndexShardTestCase.getEngine(indexShard); + assertThat(indexShard.routingEntry().toString(), EngineTestCase.getInFlightDocCount(engine), equalTo(0L)); + } catch (AlreadyClosedException ignored) { + // shard is closed + } + } + } + } + }); + } + private IndexShard getShardOrNull(ClusterState clusterState, ShardRouting shardRouting) { if (shardRouting == null || shardRouting.assignedToNode() == false) { return null; @@ -2523,9 +2541,10 @@ public final class InternalTestCluster extends TestCluster { } @Override - public synchronized void assertAfterTest() throws IOException { + public synchronized void assertAfterTest() throws Exception { super.assertAfterTest(); assertRequestsFinished(); + assertNoInFlightDocsInEngine(); for (NodeAndClient nodeAndClient : nodes.values()) { NodeEnvironment env = nodeAndClient.node().getNodeEnvironment(); Set shardIds = env.lockedShards(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java index b11841cffc6..9bb633b2efe 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestCluster.java @@ -90,7 +90,7 @@ public abstract class TestCluster implements Closeable { /** * This method checks all the things that need to be checked after each test */ - public void assertAfterTest() throws IOException { + public void assertAfterTest() throws Exception { ensureEstimatedStats(); }