From 824beea89d8c51c7037041b3c843e1511d825d96 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 20 Jan 2017 16:55:00 +0100 Subject: [PATCH] Fix handling of document failure expcetion in InternalEngine (#22718) Today we try to be smart and make a generic decision if an exception should be treated as a document failure but in some cases concurrency in the index writer make this decision very difficult since we don't have a consistent state in the case another thread is currently failing the IndexWriter/InternalEngine due to a tragic event. This change simplifies the exception handling and makes specific decisions about document failures rather than using a generic heuristic. This prevent exceptions to be treated as document failures that should have failed the engine but backed out of failing since since some other thread has already taken over the failure procedure but didn't finish yet. --- .../action/bulk/TransportShardBulkAction.java | 7 +- .../elasticsearch/index/engine/Engine.java | 4 +- .../index/engine/InternalEngine.java | 180 ++++++++---------- .../elasticsearch/index/shard/IndexShard.java | 12 +- .../shard/TranslogRecoveryPerformer.java | 6 +- .../index/engine/InternalEngineTests.java | 142 +++++++------- .../index/engine/ShadowEngineTests.java | 4 +- .../ESIndexLevelReplicationTestCase.java | 4 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/shard/IndexShardTestCase.java | 6 +- 10 files changed, 178 insertions(+), 191 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2a9ee444941..12981664c2a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -63,6 +63,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Map; /** Performs shard-level bulk (index, delete or update) operations */ @@ -424,7 +425,7 @@ public class TransportShardBulkAction extends TransportWriteActiontrue is returned. In the case of a critical + * version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown. * - * @param the result type * @param op the operation * @param currentVersion the current version * @param expectedVersion the expected version * @param deleted {@code true} if the current version is not found or represents a delete - * @param onSuccess if there is a version conflict that can be ignored, the result of the operation - * @param onFailure if there is a version conflict that can not be ignored, the result of the operation - * @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value - * is not present + * @return true iff a non-critical version conflict (origin recovery or replica) is found otherwise false + * @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary + * @throws IllegalArgumentException if an unsupported version type is used. */ - private Optional checkVersionConflict( - final Operation op, - final long currentVersion, - final long expectedVersion, - final boolean deleted, - final Supplier onSuccess, - final Function onFailure) { - final T result; + private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) { if (op.versionType() == VersionType.FORCE) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { // If index was created in 5.0 or later, 'force' is not allowed at all @@ -479,23 +470,19 @@ public class InternalEngine extends Engine { if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { if (op.origin() == Operation.Origin.PRIMARY) { // fatal version conflict - final VersionConflictEngineException e = - new VersionConflictEngineException( + throw new VersionConflictEngineException( shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - result = onFailure.apply(e); + } else { - /* - * Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a - * successful result. - */ - result = onSuccess.get(); + /* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a + * successful result.*/ + return true; } - return Optional.of(result); } else { - return Optional.empty(); + return false; } } @@ -510,7 +497,7 @@ public class InternalEngine extends Engine { } @Override - public IndexResult index(Index index) { + public IndexResult index(Index index) throws IOException { IndexResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); @@ -522,54 +509,17 @@ public class InternalEngine extends Engine { result = innerIndex(index); } } - } catch (Exception e) { - result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; } return result; } - /** - * Inspects exception thrown when executing index or delete operations - * - * @return failure if the failure is a document specific failure (e.g. analysis chain failure) - * or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event) - *

- * Note: pkg-private for testing - */ - final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) { - boolean isDocumentFailure; - try { - // When indexing a document into Lucene, Lucene distinguishes between environment related errors - // (like out of disk space) and document specific errors (like analysis chain problems) by setting - // the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of - // errors and returns true if that is the case. We use that to indicate a document level failure - // and set the error in operation.setFailure. In case of environment related errors, the failure - // is bubbled up - isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false; - if (failure instanceof AlreadyClosedException) { - // ensureOpen throws AlreadyClosedException which is not a document level issue - isDocumentFailure = false; - } - } catch (Exception inner) { - // we failed checking whether the failure can fail the engine, treat it as a persistent engine failure - isDocumentFailure = false; - failure.addSuppressed(inner); - } - if (isDocumentFailure) { - return failure; - } else { - // throw original exception in case the exception caused the engine to fail - rethrow(failure); - return null; - } - } - - // hack to rethrow original exception in case of engine level failures during index/delete operation - @SuppressWarnings("unchecked") - private static void rethrow(Throwable t) throws T { - throw (T) t; - } - private boolean canOptimizeAddDocument(Index index) { if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " @@ -610,9 +560,9 @@ public class InternalEngine extends Engine { } private IndexResult innerIndex(Index index) throws IOException { + // TODO we gotta split this method up it's too big! assert assertSequenceNumber(index.origin(), index.seqNo()); final Translog.Location location; - final long updatedVersion; long seqNo = index.seqNo(); try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); @@ -678,14 +628,14 @@ public class InternalEngine extends Engine { } } final long expectedVersion = index.version(); - final Optional resultOnVersionConflict = - checkVersionConflict( - index, - currentVersion, - expectedVersion, - deleted, - () -> new IndexResult(currentVersion, index.seqNo(), false), - e -> new IndexResult(e, currentVersion, index.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); + } final IndexResult indexResult; if (resultOnVersionConflict.isPresent()) { @@ -702,18 +652,38 @@ public class InternalEngine extends Engine { * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. */ index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); - updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.parsedDoc().version().setLongValue(updatedVersion); - - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); - index(index.docs(), indexWriter); - } else { - update(index.uid(), index.docs(), indexWriter); + IndexResult innerIndexResult; + try { + if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); + index(index.docs(), indexWriter); + } else { + update(index.uid(), index.docs(), indexWriter); + } + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted); + } catch (Exception ex) { + if (indexWriter.getTragicException() == null) { + /* There is no tragic event recorded so this must be a document failure. + * + * The handling inside IW doesn't guarantee that an tragic / aborting exception + * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW + * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that + * we can potentially handle the exception before the engine is failed. + * Bottom line is that we can only rely on the fact that if it's a document failure then + * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather + * non-document failure + */ + innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo()); + } else { + throw ex; + } } - indexResult = new IndexResult(updatedVersion, seqNo, deleted); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + assert innerIndexResult != null; + indexResult = innerIndexResult; } if (!indexResult.hasFailure()) { location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY @@ -729,7 +699,6 @@ public class InternalEngine extends Engine { seqNoService().markSeqNoAsCompleted(seqNo); } } - } private static void index(final List docs, final IndexWriter indexWriter) throws IOException { @@ -769,14 +738,19 @@ public class InternalEngine extends Engine { } @Override - public DeleteResult delete(Delete delete) { + public DeleteResult delete(Delete delete) throws IOException { DeleteResult result; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: result = innerDelete(delete); - } catch (Exception e) { - result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo()); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; } maybePruneDeletedTombstones(); return result; @@ -811,15 +785,14 @@ public class InternalEngine extends Engine { } final long expectedVersion = delete.version(); - - final Optional resultOnVersionConflict = - checkVersionConflict( - delete, - currentVersion, - expectedVersion, - deleted, - () -> new DeleteResult(expectedVersion, delete.seqNo(), true), - e -> new DeleteResult(e, expectedVersion, delete.seqNo())); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo())); + } final DeleteResult deleteResult; if (resultOnVersionConflict.isPresent()) { deleteResult = resultOnVersionConflict.get(); @@ -852,6 +825,7 @@ public class InternalEngine extends Engine { } private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { + assert uid != null : "uid must not be null"; final boolean found; if (currentVersion == Versions.NOT_FOUND) { // doc does not exist and no prior deletes @@ -861,6 +835,8 @@ public class InternalEngine extends Engine { found = false; } else { // we deleted a currently existing document + // any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming + // from this. indexWriter.deleteDocuments(uid); found = true; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb486de610b..a7094c240f4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -543,13 +543,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public Engine.IndexResult index(Engine.Index index) { + public Engine.IndexResult index(Engine.Index index) throws IOException { ensureWriteAllowed(index); Engine engine = getEngine(); return index(engine, index); } - private Engine.IndexResult index(Engine engine, Engine.Index index) { + private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException { active.set(true); final Engine.IndexResult result; index = indexingOperationListeners.preIndex(shardId, index); @@ -592,13 +592,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } - public Engine.DeleteResult delete(Engine.Delete delete) { + public Engine.DeleteResult delete(Engine.Delete delete) throws IOException { ensureWriteAllowed(delete); Engine engine = getEngine(); return delete(engine, delete); } - private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; delete = indexingOperationListeners.preDelete(shardId, delete); @@ -1922,12 +1922,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } @Override - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { IndexShard.this.index(engine, engineIndex); } @Override - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { IndexShard.this.delete(engine, engineDelete); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index b5c23d9dc46..6f4ae66a12a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -149,7 +149,7 @@ public class TranslogRecoveryPerformer { * cause a {@link MapperException} to be thrown if an update * is encountered. */ - private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) { + private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException { try { switch (operation.opType()) { @@ -207,11 +207,11 @@ public class TranslogRecoveryPerformer { operationProcessed(); } - protected void index(Engine engine, Engine.Index engineIndex) { + protected void index(Engine engine, Engine.Index engineIndex) throws IOException { engine.index(engineIndex); } - protected void delete(Engine engine, Engine.Delete engineDelete) { + protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException { engine.delete(engineDelete); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ef05d8f27ca..97638b53e37 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -32,6 +32,7 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; @@ -63,6 +64,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; @@ -125,6 +127,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.OldIndexUtils; +import org.elasticsearch.test.rest.yaml.section.Assertion; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.MatcherAssert; @@ -133,6 +136,8 @@ import org.junit.Before; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -1144,7 +1149,7 @@ public class InternalEngineTests extends ESTestCase { assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } - public void testVersioningNewCreate() { + public void testVersioningNewCreate() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); @@ -1155,7 +1160,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testVersioningNewIndex() { + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1166,7 +1171,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testExternalVersioningNewIndex() { + public void testExternalVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1177,7 +1182,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getVersion(), equalTo(12L)); } - public void testVersioningIndexConflict() { + public void testVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1199,7 +1204,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflict() { + public void testExternalVersioningIndexConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1242,7 +1247,7 @@ public class InternalEngineTests extends ESTestCase { } } - public void testVersioningIndexConflictWithFlush() { + public void testVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1266,7 +1271,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testExternalVersioningIndexConflictWithFlush() { + public void testExternalVersioningIndexConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(index); @@ -1361,6 +1366,8 @@ public class InternalEngineTests extends ESTestCase { } } catch (AlreadyClosedException ex) { // fine + } catch (IOException e) { + throw new AssertionError(e); } } }; @@ -1379,7 +1386,7 @@ public class InternalEngineTests extends ESTestCase { } - public void testVersioningDeleteConflict() { + public void testVersioningDeleteConflict() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1412,7 +1419,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningDeleteConflictWithFlush() { + public void testVersioningDeleteConflictWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1451,7 +1458,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsException() { + public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1463,7 +1470,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningCreateExistsExceptionWithFlush() { + public void testVersioningCreateExistsExceptionWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); Engine.IndexResult indexResult = engine.index(create); @@ -1477,7 +1484,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - public void testVersioningReplicaConflict1() { + public void testVersioningReplicaConflict1() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1526,7 +1533,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); } - public void testVersioningReplicaConflict2() { + public void testVersioningReplicaConflict2() throws IOException { final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); final Engine.Index v1Index = indexForDoc(doc); final Engine.IndexResult v1Result = engine.index(v1Index); @@ -1595,7 +1602,7 @@ public class InternalEngineTests extends ESTestCase { assertThat(replicaV2Result.getVersion(), equalTo(3L)); } - public void testBasicCreatedFlag() { + public void testBasicCreatedFlag() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1612,7 +1619,7 @@ public class InternalEngineTests extends ESTestCase { assertTrue(indexResult.isCreated()); } - public void testCreatedFlagAfterFlush() { + public void testCreatedFlagAfterFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = engine.index(index); @@ -1654,7 +1661,7 @@ public class InternalEngineTests extends ESTestCase { // #5891: make sure IndexWriter's infoStream output is // sent to lucene.iw with log level TRACE: - public void testIndexWriterInfoStream() throws IllegalAccessException { + public void testIndexWriterInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterInfoStream"); mockAppender.start(); @@ -1915,7 +1922,7 @@ public class InternalEngineTests extends ESTestCase { } // #8603: make sure we can separately log IFD's messages - public void testIndexWriterIFDInfoStream() throws IllegalAccessException { + public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOException { assumeFalse("who tests the tester?", VERBOSE); MockAppender mockAppender = new MockAppender("testIndexWriterIFDInfoStream"); mockAppender.start(); @@ -2532,20 +2539,8 @@ public class InternalEngineTests extends ESTestCase { } } - public void testCheckDocumentFailure() throws Exception { - ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); - Exception documentFailure = engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new IOException("simulated document failure")); - assertThat(documentFailure, instanceOf(IOException.class)); - try { - engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new CorruptIndexException("simulated environment failure", "")); - fail("expected exception to be thrown"); - } catch (Exception envirnomentException) { - assertThat(envirnomentException.getMessage(), containsString("simulated environment failure")); - } - } - private static class ThrowingIndexWriter extends IndexWriter { - private AtomicReference failureToThrow = new AtomicReference<>(); + private AtomicReference> failureToThrow = new AtomicReference<>(); public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { super(d, conf); @@ -2558,13 +2553,15 @@ public class InternalEngineTests extends ESTestCase { } private void maybeThrowFailure() throws IOException { - Exception failure = failureToThrow.get(); - if (failure instanceof RuntimeException) { - throw (RuntimeException)failure; - } else if (failure instanceof IOException) { - throw (IOException)failure; - } else { - assert failure == null : "unsupported failure class: " + failure.getClass().getCanonicalName(); + if (failureToThrow.get() != null) { + Exception failure = failureToThrow.get().get(); + if (failure instanceof RuntimeException) { + throw (RuntimeException) failure; + } else if (failure instanceof IOException) { + throw (IOException) failure; + } else { + assert false: "unsupported failure class: " + failure.getClass().getCanonicalName(); + } } } @@ -2574,14 +2571,8 @@ public class InternalEngineTests extends ESTestCase { return super.deleteDocuments(terms); } - public void setThrowFailure(IOException documentFailure) { - Objects.requireNonNull(documentFailure); - failureToThrow.set(documentFailure); - } - - public void setThrowFailure(RuntimeException runtimeFailure) { - Objects.requireNonNull(runtimeFailure); - failureToThrow.set(runtimeFailure); + public void setThrowFailure(Supplier failureSupplier) { + failureToThrow.set(failureSupplier); } public void clearFailure() { @@ -2594,13 +2585,14 @@ public class InternalEngineTests extends ESTestCase { final ParsedDocument doc1 = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null); final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null); + ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig()); try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) { // test document failure while indexing if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); @@ -2610,29 +2602,33 @@ public class InternalEngineTests extends ESTestCase { assertNull(indexResult.getFailure()); engine.index(indexForDoc(doc2)); - // test document failure while deleting + // test failure while deleting + // all these simulated exceptions are not fatal to the IW so we treat them as document failures if (randomBoolean()) { - throwingIndexWriter.setThrowFailure(new IOException("simulated")); + throwingIndexWriter.setThrowFailure(() -> new IOException("simulated")); + expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } else { - throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length")); + throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); + expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1)))); } - Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1))); - assertNotNull(deleteResult.getFailure()); // test non document level failure is thrown if (randomBoolean()) { // simulate close by corruption - throwingIndexWriter.setThrowFailure(new CorruptIndexException("simulated", "hello")); - try { - if (randomBoolean()) { - engine.index(indexForDoc(doc3)); - } else { - engine.delete(new Engine.Delete("test", "2", newUid(doc2))); - } - fail("corruption should throw exceptions"); - } catch (Exception e) { - assertThat(e, instanceOf(CorruptIndexException.class)); - } + throwingIndexWriter.setThrowFailure(null); + UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> { + Engine.Index index = indexForDoc(doc3); + index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") { + // this is a hack to add a failure during store document which triggers a tragic event + // and in turn fails the engine + @Override + public BytesRef binaryValue() { + throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException()); + } + }); + engine.index(index); + }); + assertTrue(uncheckedIOException.getCause() instanceof MockDirectoryWrapper.FakeIOException); } else { // normal close engine.close(); @@ -2807,7 +2803,11 @@ public class InternalEngineTests extends ESTestCase { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } }); thread[i].start(); @@ -2867,7 +2867,11 @@ public class InternalEngineTests extends ESTestCase { } int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { - engine.index(docs.get(docOffset)); + try { + engine.index(docs.get(docOffset)); + } catch (IOException e) { + throw new AssertionError(e); + } } } }; @@ -3055,7 +3059,13 @@ public class InternalEngineTests extends ESTestCase { final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null); skip.set(randomBoolean()); - final Thread thread = new Thread(() -> finalInitialEngine.index(indexForDoc(doc))); + final Thread thread = new Thread(() -> { + try { + finalInitialEngine.index(indexForDoc(doc)); + } catch (IOException e) { + throw new AssertionError(e); + } + }); thread.start(); if (skip.get()) { threads.add(thread); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index cc92d9bd9c2..219af4af4da 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -267,7 +267,7 @@ public class ShadowEngineTests extends ESTestCase { protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3}); - public void testCommitStats() { + public void testCommitStats() throws IOException { // create a doc and refresh ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); @@ -846,7 +846,7 @@ public class ShadowEngineTests extends ESTestCase { searchResult.close(); } - public void testFailEngineOnCorruption() { + public void testFailEngineOnCorruption() throws IOException { ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); primaryEngine.index(indexForDoc(doc)); primaryEngine.flush(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 3216d30bdaa..042eb85cf36 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -329,7 +329,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica); + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; class PrimaryRef implements ReplicationOperation.Primary { @@ -449,7 +449,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } @Override - protected void performOnReplica(IndexRequest request, IndexShard replica) { + protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException { final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e95d7ace10b..d5c10dddc36 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -320,11 +320,11 @@ public class RefreshListenersTests extends ESTestCase { refresher.cancel(); } - private Engine.IndexResult index(String id) { + private Engine.IndexResult index(String id) throws IOException { return index(id, "test"); } - private Engine.IndexResult index(String id, String testFieldValue) { + private Engine.IndexResult index(String id, String testFieldValue) throws IOException { String type = "test"; String uid = type + ":" + id; Document document = new Document(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 605b9026c26..650e0794efa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -449,11 +449,11 @@ public abstract class IndexShardTestCase extends ESTestCase { } - protected Engine.Index indexDoc(IndexShard shard, String type, String id) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); } - protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) { + protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException { final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( @@ -471,7 +471,7 @@ public abstract class IndexShardTestCase extends ESTestCase { return index; } - protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) { + protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException { final Engine.Delete delete; if (shard.routingEntry().primary()) { delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);