diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2a9ee444941..12981664c2a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -63,6 +63,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Map; /** Performs shard-level bulk (index, delete or update) operations */ @@ -424,7 +425,7 @@ public class TransportShardBulkAction extends TransportWriteActiontrue is returned. In the case of a critical + * version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown. * - * @param the result type * @param op the operation * @param currentVersion the current version * @param expectedVersion the expected version * @param deleted {@code true} if the current version is not found or represents a delete - * @param onSuccess if there is a version conflict that can be ignored, the result of the operation - * @param onFailure if there is a version conflict that can not be ignored, the result of the operation - * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value - * is not present + * @return true iff a non-critical version conflict (origin recovery or replica) is found otherwise false + * @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary + * @throws IllegalArgumentException if an unsupported version type is used. */ - private Optional checkVersionConflict( - final Operation op, - final long currentVersion, - final long expectedVersion, - final boolean deleted, - final Supplier onSuccess, - final Function onFailure) { - final T result; + private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) { if (op.versionType() == VersionType.FORCE) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { // If index was created in 5.0 or later, 'force' is not allowed at all @@ -479,23 +470,19 @@ public class InternalEngine extends Engine { if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { if (op.origin() == Operation.Origin.PRIMARY) { // fatal version conflict - final VersionConflictEngineException e = - new VersionConflictEngineException( + throw new VersionConflictEngineException( shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - result = onFailure.apply(e); + } else { - /* - * Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a - * successful result. - */ - result = onSuccess.get(); + /* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a + * successful result.*/ + return true; } - return Optional.of(result); } else { - return Optional.empty(); + return false; } } @@ -510,7 +497,7 @@ public class InternalEngine extends Engine { } @Override - public IndexResult index(Index index) { + public IndexResult index(Index index) throws IOException { IndexResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); @@ -522,54 +509,17 @@ public class InternalEngine extends Engine { result = innerIndex(index); } } - } catch (Exception e) { - result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; } return result; } - /** - * Inspects exception thrown when executing index or delete operations - * - * @return failure if the failure is a document specific failure (e.g. analysis chain failure) - * or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event) - *

- * Note: pkg-private for testing - */ - final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) { - boolean isDocumentFailure; - try { - // When indexing a document into Lucene, Lucene distinguishes between environment related errors - // (like out of disk space) and document specific errors (like analysis chain problems) by setting - // the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of - // errors and returns true if that is the case. We use that to indicate a document level failure - // and set the error in operation.setFailure. In case of environment related errors, the failure - // is bubbled up - isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false; - if (failure instanceof AlreadyClosedException) { - // ensureOpen throws AlreadyClosedException which is not a document level issue - isDocumentFailure = false; - } - } catch (Exception inner) { - // we failed checking whether the failure can fail the engine, treat it as a persistent engine failure - isDocumentFailure = false; - failure.addSuppressed(inner); - } - if (isDocumentFailure) { - return failure; - } else { - // throw original exception in case the exception caused the engine to fail - rethrow(failure); - return null; - } - } - - // hack to rethrow original exception in case of engine level failures during index/delete operation - @SuppressWarnings("unchecked") - private static void rethrow(Throwable t) throws T { - throw (T) t; - } - private boolean canOptimizeAddDocument(Index index) { if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " @@ -610,9 +560,9 @@ public class InternalEngine extends Engine { } private IndexResult innerIndex(Index index) throws IOException { + // TODO we gotta split this method up it's too big! assert assertSequenceNumber(index.origin(), index.seqNo()); final Translog.Location location; - final long updatedVersion; long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); @@ -678,14 +628,14 @@ public class InternalEngine extends Engine { } } final long expectedVersion = index.version(); - final Optional resultOnVersionConflict = - checkVersionConflict( - index, - currentVersion, - expectedVersion, - deleted, - () -> new IndexResult(currentVersion, index.seqNo(), false), - e -> new IndexResult(e, currentVersion, index.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); + } final IndexResult indexResult; if (resultOnVersionConflict.isPresent()) { @@ -702,18 +652,38 @@ public class InternalEngine extends Engine { * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); - updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.parsedDoc().version().setLongValue(updatedVersion); - - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); - index(index.docs(), indexWriter); - } else { - update(index.uid(), index.docs(), indexWriter); + IndexResult innerIndexResult; + try { + if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); + index(index.docs(), indexWriter); + } else { + update(index.uid(), index.docs(), indexWriter); + } + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted); + } catch (Exception ex) { + if (indexWriter.getTragicException() == null) { + /* There is no tragic event recorded so this must be a document failure. + * + * The handling inside IW doesn't guarantee that an tragic / aborting exception + * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW + * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that + * we can potentially handle the exception before the engine is failed. + * Bottom line is that we can only rely on the fact that if it's a document failure then + * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather + * non-document failure + */ + innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo()); + } else { + throw ex; + } } - indexResult = new IndexResult(updatedVersion, seqNo, deleted); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + assert innerIndexResult != null; + indexResult = innerIndexResult; } if (!indexResult.hasFailure()) { location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY @@ -729,7 +699,6 @@ public class InternalEngine extends Engine { seqNoService().markSeqNoAsCompleted(seqNo); } } - } private static void index(final List docs, final IndexWriter indexWriter) throws IOException { @@ -769,14 +738,19 @@ public class InternalEngine extends Engine { } @Override - public DeleteResult delete(Delete delete) { + public DeleteResult delete(Delete delete) throws IOException { DeleteResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: result = innerDelete(delete); - } catch (Exception e) { - result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo()); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; } maybePruneDeletedTombstones(); return result; @@ -811,15 +785,14 @@ public class InternalEngine extends Engine { } final long expectedVersion = delete.version(); - - final Optional resultOnVersionConflict = - checkVersionConflict( - delete, - currentVersion, - expectedVersion, - deleted, - () -> new DeleteResult(expectedVersion, delete.seqNo(), true), - e -> new DeleteResult(e, expectedVersion, delete.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo())); + } final DeleteResult deleteResult; if (resultOnVersionConflict.isPresent()) { deleteResult = resultOnVersionConflict.get(); @@ -852,6 +825,7 @@ public class InternalEngine extends Engine { } private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { + assert uid != null : "uid must not be null"; final boolean found; if (currentVersion == Versions.NOT_FOUND) { // doc does not exist and no prior deletes @@ -861,6 +835,8 @@ public class InternalEngine extends Engine { found = false; } else { // we deleted a currently existing document + // any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming + // from this. indexWriter.deleteDocuments(uid); found = true; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb486de610b..a7094c240f4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -543,13 +543,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public Engine.IndexResult index(Engine.Index index) { + public Engine.IndexResult index(Engine.Index index) throws IOException { ensureWriteAllowed(index); Engine engine = getEngine(); return index(engine, index); } - private Engine.IndexResult index(Engine engine, Engine.Index index) { + private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException { active.set(true); final Engine.IndexResult result; index = indexingOperationListeners.preIndex(shardId, index); @@ -592,13 +592,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } - public Engine.DeleteResult delete(Engine.Delete delete) { + public Engine.DeleteResult delete(Engine.Delete delete) throws IOException { ensureWriteAllowed(delete); Engine engine = getEngine(); return delete(engine, delete); } - private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; delete = indexingOperationListeners.preDelete(shardId, delete); @@ -1922,12 +1922,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } @Override - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { IndexShard.this.index(engine, engineIndex); } @Override - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { IndexShard.this.delete(engine, engineDelete); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index b5c23d9dc46..6f4ae66a12a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -149,7 +149,7 @@ public class TranslogRecoveryPerformer { * cause a {@link MapperException} to be thrown if an update * is encountered. */ - private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) { + private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException { try { switch (operation.opType()) { @@ -207,11 +207,11 @@ public class TranslogRecoveryPerformer { operationProcessed(); } - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { engine.index(engineIndex); } - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { engine.delete(engineDelete); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ef05d8f27ca..97638b53e37 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -32,6 +32,7 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -63,6 +64,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; @@ -125,6 +127,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.OldIndexUtils; +import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -133,6 +136,8 @@ import org.junit.Before; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -1144,7 +1149,7 @@ public class InternalEngineTests extends ESTestCase { assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } - public void testVersioningNewCreate() { + public void testVersioningNewCreate() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); @@ -1155,7 +1160,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testVersioningNewIndex() { + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1166,7 +1171,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testExternalVersioningNewIndex() { + public void testExternalVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1177,7 +1182,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(12L)); } - public void testVersioningIndexConflict() { + public void testVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1199,7 +1204,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflict() { + public void testExternalVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1242,7 +1247,7 @@ public class InternalEngineTests extends ESTestCase { } } - public void testVersioningIndexConflictWithFlush() { + public void testVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1266,7 +1271,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflictWithFlush() { + public void testExternalVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1361,6 +1366,8 @@ public class InternalEngineTests extends ESTestCase { } } catch (AlreadyClosedException ex) { // fine + } catch (IOException e) { + throw new AssertionError(e); } } }; @@ -1379,7 +1386,7 @@ public class InternalEngineTests extends ESTestCase { } - public void testVersioningDeleteConflict() { + public void testVersioningDeleteConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1412,7 +1419,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningDeleteConflictWithFlush() { + public void testVersioningDeleteConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1451,7 +1458,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsException() { + public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1463,7 +1470,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsExceptionWithFlush() { + public void testVersioningCreateExistsExceptionWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1477,7 +1484,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningReplicaConflict1() { + public void testVersioningReplicaConflict1() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1526,7 +1533,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); } - public void testVersioningReplicaConflict2() { + public void testVersioningReplicaConflict2() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1595,7 +1602,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(replicaV2Result.getVersion(), equalTo(3L)); } - public void testBasicCreatedFlag() { + public void testBasicCreatedFlag() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1612,7 +1619,7 @@ public class InternalEngineTests extends ESTestCase { assertTrue(indexResult.isCreated()); } - public void testCreatedFlagAfterFlush() { + public void testCreatedFlagAfterFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1654,7 +1661,7 @@ public class InternalEngineTests extends ESTestCase { // #5891: make sure IndexWriter's infoStream output is // sent to lucene.iw with log level TRACE: - public void testIndexWriterInfoStream() throws IllegalAccessException { + public void testIndexWriterInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterInfoStream"); mockAppender.start(); @@ -1915,7 +1922,7 @@ public class InternalEngineTests extends ESTestCase { } // #8603: make sure we can separately log IFD's messages - public void testIndexWriterIFDInfoStream() throws IllegalAccessException { + public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterIFDInfoStream"); mockAppender.start(); @@ -2532,20 +2539,8 @@ public class InternalEngineTests extends ESTestCase { } } - public void testCheckDocumentFailure() throws Exception { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); - Exception documentFailure = engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new IOException("simulated document failure")); - assertThat(documentFailure, instanceOf(IOException.class)); - try { - engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new CorruptIndexException("simulated environment failure", "")); - fail("expected exception to be thrown"); - } catch (Exception envirnomentException) { - assertThat(envirnomentException.getMessage(), containsString("simulated environment failure")); - } - } - private static class ThrowingIndexWriter extends IndexWriter { - private AtomicReference failureToThrow = new AtomicReference<>(); + private AtomicReference> failureToThrow = new AtomicReference<>(); public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { super(d, conf); @@ -2558,13 +2553,15 @@ public class InternalEngineTests extends ESTestCase { } private void maybeThrowFailure() throws IOException { - Exception failure = failureToThrow.get(); - if (failure instanceof RuntimeException) { - throw (RuntimeException)failure; - } else if (failure instanceof IOException) { - throw (IOException)failure; - } else { - assert failure == null : "unsupported failure class: " + failure.getClass().getCanonicalName(); + if (failureToThrow.get() != null) { + Exception failure = failureToThrow.get().get(); + if (failure instanceof RuntimeException) { + throw (RuntimeException) failure; + } else if (failure instanceof IOException) { + throw (IOException) failure; + } else { + assert false: "unsupported failure class: " + failure.getClass().getCanonicalName(); + } } } @@ -2574,14 +2571,8 @@ public class InternalEngineTests extends ESTestCase { return super.deleteDocuments(terms); } - public void setThrowFailure(IOException documentFailure) { - Objects.requireNonNull(documentFailure); - failureToThrow.set(documentFailure); - } - - public void setThrowFailure(RuntimeException runtimeFailure) { - Objects.requireNonNull(runtimeFailure); - failureToThrow.set(runtimeFailure); + public void setThrowFailure(Supplier failureSupplier) { + failureToThrow.set(failureSupplier); } public void clearFailure() { @@ -2594,13 +2585,14 @@ public class InternalEngineTests extends ESTestCase { final ParsedDocument doc1 = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null); + ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig()); try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) { // test document failure while indexing if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); @@ -2610,29 +2602,33 @@ public class InternalEngineTests extends ESTestCase { assertNull(indexResult.getFailure()); engine.index(indexForDoc(doc2)); - // test document failure while deleting + // test failure while deleting + // all these simulated exceptions are not fatal to the IW so we treat them as document failures if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); + expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); + expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } - Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1))); - assertNotNull(deleteResult.getFailure()); // test non document level failure is thrown if (randomBoolean()) { // simulate close by corruption - throwingIndexWriter.setThrowFailure(new CorruptIndexException("simulated", "hello")); - try { - if (randomBoolean()) { - engine.index(indexForDoc(doc3)); - } else { - engine.delete(new Engine.Delete("test", "2", newUid(doc2))); - } - fail("corruption should throw exceptions"); - } catch (Exception e) { - assertThat(e, instanceOf(CorruptIndexException.class)); - } + throwingIndexWriter.setThrowFailure(null); + UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> { + Engine.Index index = indexForDoc(doc3); + index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") { + // this is a hack to add a failure during store document which triggers a tragic event + // and in turn fails the engine + @Override + public BytesRef binaryValue() { + throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException()); + } + }); + engine.index(index); + }); + assertTrue(uncheckedIOException.getCause() instanceof MockDirectoryWrapper.FakeIOException); } else { // normal close engine.close(); @@ -2807,7 +2803,11 @@ public class InternalEngineTests extends ESTestCase { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } }); thread[i].start(); @@ -2867,7 +2867,11 @@ public class InternalEngineTests extends ESTestCase { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } } }; @@ -3055,7 +3059,13 @@ public class InternalEngineTests extends ESTestCase { final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null); skip.set(randomBoolean()); - final Thread thread = new Thread(() -> finalInitialEngine.index(indexForDoc(doc))); + final Thread thread = new Thread(() -> { + try { + finalInitialEngine.index(indexForDoc(doc)); + } catch (IOException e) { + throw new AssertionError(e); + } + }); thread.start(); if (skip.get()) { threads.add(thread); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index cc92d9bd9c2..219af4af4da 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -267,7 +267,7 @@ public class ShadowEngineTests extends ESTestCase { protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); - public void testCommitStats() { + public void testCommitStats() throws IOException { // create a doc and refresh ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); @@ -846,7 +846,7 @@ public class ShadowEngineTests extends ESTestCase { searchResult.close(); } - public void testFailEngineOnCorruption() { + public void testFailEngineOnCorruption() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); primaryEngine.flush(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 3216d30bdaa..042eb85cf36 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -329,7 +329,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica); + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; class PrimaryRef implements ReplicationOperation.Primary { @@ -449,7 +449,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } @Override - protected void performOnReplica(IndexRequest request, IndexShard replica) { + protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException { final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e95d7ace10b..d5c10dddc36 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -320,11 +320,11 @@ public class RefreshListenersTests extends ESTestCase { refresher.cancel(); } - private Engine.IndexResult index(String id) { + private Engine.IndexResult index(String id) throws IOException { return index(id, "test"); } - private Engine.IndexResult index(String id, String testFieldValue) { + private Engine.IndexResult index(String id, String testFieldValue) throws IOException { String type = "test"; String uid = type + ":" + id; Document document = new Document(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 605b9026c26..650e0794efa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -449,11 +449,11 @@ public abstract class IndexShardTestCase extends ESTestCase { } - protected Engine.Index indexDoc(IndexShard shard, String type, String id) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } - protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException { final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( @@ -471,7 +471,7 @@ public abstract class IndexShardTestCase extends ESTestCase { return index; } - protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) { + protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException { final Engine.Delete delete; if (shard.routingEntry().primary()) { delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);