Fix handling of document failure expcetion in InternalEngine (#22718)

Today we try to be smart and make a generic decision if an exception should
be treated as a document failure but in some cases concurrency in the index writer
make this decision very difficult since we don't have a consistent state in the case
another thread is currently failing the IndexWriter/InternalEngine due to a tragic event.

This change simplifies the exception handling and makes specific decisions about document failures
rather than using a generic heuristic. This prevent exceptions to be treated as document failures
that should have failed the engine but backed out of failing since since some other thread has
already taken over the failure procedure but didn't finish yet.
This commit is contained in:
Simon Willnauer 2017-01-20 16:55:00 +01:00 committed by GitHub
parent f01784205f
commit 824beea89d
10 changed files with 178 additions and 191 deletions

View File

@ -63,6 +63,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
/** Performs shard-level bulk (index, delete or update) operations */
@ -424,7 +425,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) throws IOException {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent());
@ -483,12 +484,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return primary.index(operation);
}
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) throws IOException {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);

View File

@ -282,7 +282,7 @@ public abstract class Engine implements Closeable {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract IndexResult index(final Index index);
public abstract IndexResult index(final Index index) throws IOException;
/**
* Perform document delete operation on the engine
@ -292,7 +292,7 @@ public abstract class Engine implements Closeable {
*
* Note: engine level failures (i.e. persistent engine failures) are thrown
*/
public abstract DeleteResult delete(final Delete delete);
public abstract DeleteResult delete(final Delete delete) throws IOException;
public abstract NoOpResult noOp(final NoOp noOp);

View File

@ -445,27 +445,18 @@ public class InternalEngine extends Engine {
}
/**
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
* no conflicts are found, the optional return value is not present.
* Checks for version conflicts. If a non-critical version conflict exists <code>true</code> is returned. In the case of a critical
* version conflict (if operation origin is primary) a {@link VersionConflictEngineException} is thrown.
*
* @param <T> the result type
* @param op the operation
* @param currentVersion the current version
* @param expectedVersion the expected version
* @param deleted {@code true} if the current version is not found or represents a delete
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
* is not present
* @return <code>true</code> iff a non-critical version conflict (origin recovery or replica) is found otherwise <code>false</code>
* @throws VersionConflictEngineException if a critical version conflict was found where the operation origin is primary
* @throws IllegalArgumentException if an unsupported version type is used.
*/
private <T extends Result> Optional<T> checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted,
final Supplier<T> onSuccess,
final Function<VersionConflictEngineException, T> onFailure) {
final T result;
private boolean checkVersionConflict(final Operation op, final long currentVersion, final long expectedVersion, final boolean deleted) {
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
@ -479,23 +470,19 @@ public class InternalEngine extends Engine {
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin() == Operation.Origin.PRIMARY) {
// fatal version conflict
final VersionConflictEngineException e =
new VersionConflictEngineException(
throw new VersionConflictEngineException(
shardId,
op.type(),
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);
} else {
/*
* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.
*/
result = onSuccess.get();
/* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.*/
return true;
}
return Optional.of(result);
} else {
return Optional.empty();
return false;
}
}
@ -510,7 +497,7 @@ public class InternalEngine extends Engine {
}
@Override
public IndexResult index(Index index) {
public IndexResult index(Index index) throws IOException {
IndexResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
@ -522,54 +509,17 @@ public class InternalEngine extends Engine {
result = innerIndex(index);
}
}
} catch (Exception e) {
result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
return result;
}
/**
* Inspects exception thrown when executing index or delete operations
*
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
* <p>
* Note: pkg-private for testing
*/
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
boolean isDocumentFailure;
try {
// When indexing a document into Lucene, Lucene distinguishes between environment related errors
// (like out of disk space) and document specific errors (like analysis chain problems) by setting
// the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of
// errors and returns true if that is the case. We use that to indicate a document level failure
// and set the error in operation.setFailure. In case of environment related errors, the failure
// is bubbled up
isDocumentFailure = maybeFailEngine(operation.operationType().getLowercase(), failure) == false;
if (failure instanceof AlreadyClosedException) {
// ensureOpen throws AlreadyClosedException which is not a document level issue
isDocumentFailure = false;
}
} catch (Exception inner) {
// we failed checking whether the failure can fail the engine, treat it as a persistent engine failure
isDocumentFailure = false;
failure.addSuppressed(inner);
}
if (isDocumentFailure) {
return failure;
} else {
// throw original exception in case the exception caused the engine to fail
rethrow(failure);
return null;
}
}
// hack to rethrow original exception in case of engine level failures during index/delete operation
@SuppressWarnings("unchecked")
private static <T extends Throwable> void rethrow(Throwable t) throws T {
throw (T) t;
}
private boolean canOptimizeAddDocument(Index index) {
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
@ -610,9 +560,9 @@ public class InternalEngine extends Engine {
}
private IndexResult innerIndex(Index index) throws IOException {
// TODO we gotta split this method up it's too big!
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
final long updatedVersion;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
@ -678,14 +628,14 @@ public class InternalEngine extends Engine {
}
}
final long expectedVersion = index.version();
final Optional<IndexResult> resultOnVersionConflict =
checkVersionConflict(
index,
currentVersion,
expectedVersion,
deleted,
() -> new IndexResult(currentVersion, index.seqNo(), false),
e -> new IndexResult(e, currentVersion, index.seqNo()));
Optional<IndexResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
}
final IndexResult indexResult;
if (resultOnVersionConflict.isPresent()) {
@ -702,18 +652,38 @@ public class InternalEngine extends Engine {
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
IndexResult innerIndexResult;
try {
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
* we can potentially handle the exception before the engine is failed.
* Bottom line is that we can only rely on the fact that if it's a document failure then
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
* non-document failure
*/
innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo());
} else {
throw ex;
}
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
assert innerIndexResult != null;
indexResult = innerIndexResult;
}
if (!indexResult.hasFailure()) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
@ -729,7 +699,6 @@ public class InternalEngine extends Engine {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}
}
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
@ -769,14 +738,19 @@ public class InternalEngine extends Engine {
}
@Override
public DeleteResult delete(Delete delete) {
public DeleteResult delete(Delete delete) throws IOException {
DeleteResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
result = innerDelete(delete);
} catch (Exception e) {
result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version(), delete.seqNo());
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
maybePruneDeletedTombstones();
return result;
@ -811,15 +785,14 @@ public class InternalEngine extends Engine {
}
final long expectedVersion = delete.version();
final Optional<DeleteResult> resultOnVersionConflict =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));
Optional<DeleteResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(delete, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new DeleteResult(expectedVersion, delete.seqNo(), true))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new DeleteResult(ex, expectedVersion, delete.seqNo()));
}
final DeleteResult deleteResult;
if (resultOnVersionConflict.isPresent()) {
deleteResult = resultOnVersionConflict.get();
@ -852,6 +825,7 @@ public class InternalEngine extends Engine {
}
private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
assert uid != null : "uid must not be null";
final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
@ -861,6 +835,8 @@ public class InternalEngine extends Engine {
found = false;
} else {
// we deleted a currently existing document
// any exception that comes from this is a either an ACE or a fatal exception there can't be any document failures coming
// from this.
indexWriter.deleteDocuments(uid);
found = true;
}

View File

@ -543,13 +543,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}
public Engine.IndexResult index(Engine.Index index) {
public Engine.IndexResult index(Engine.Index index) throws IOException {
ensureWriteAllowed(index);
Engine engine = getEngine();
return index(engine, index);
}
private Engine.IndexResult index(Engine engine, Engine.Index index) {
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
active.set(true);
final Engine.IndexResult result;
index = indexingOperationListeners.preIndex(shardId, index);
@ -592,13 +592,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime);
}
public Engine.DeleteResult delete(Engine.Delete delete) {
public Engine.DeleteResult delete(Engine.Delete delete) throws IOException {
ensureWriteAllowed(delete);
Engine engine = getEngine();
return delete(engine, delete);
}
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) {
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
active.set(true);
final Engine.DeleteResult result;
delete = indexingOperationListeners.preDelete(shardId, delete);
@ -1922,12 +1922,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
@Override
protected void index(Engine engine, Engine.Index engineIndex) {
protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
IndexShard.this.index(engine, engineIndex);
}
@Override
protected void delete(Engine engine, Engine.Delete engineDelete) {
protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
IndexShard.this.delete(engine, engineDelete);
}
}

View File

@ -149,7 +149,7 @@ public class TranslogRecoveryPerformer {
* cause a {@link MapperException} to be thrown if an update
* is encountered.
*/
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) {
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
try {
switch (operation.opType()) {
@ -207,11 +207,11 @@ public class TranslogRecoveryPerformer {
operationProcessed();
}
protected void index(Engine engine, Engine.Index engineIndex) {
protected void index(Engine engine, Engine.Index engineIndex) throws IOException {
engine.index(engineIndex);
}
protected void delete(Engine engine, Engine.Delete engineDelete) {
protected void delete(Engine engine, Engine.Delete engineDelete) throws IOException {
engine.delete(engineDelete);
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
@ -63,6 +64,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
@ -125,6 +127,7 @@ import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.OldIndexUtils;
import org.elasticsearch.test.rest.yaml.section.Assertion;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
@ -133,6 +136,8 @@ import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@ -1144,7 +1149,7 @@ public class InternalEngineTests extends ESTestCase {
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
public void testVersioningNewCreate() {
public void testVersioningNewCreate() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED);
Engine.IndexResult indexResult = engine.index(create);
@ -1155,7 +1160,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getVersion(), equalTo(1L));
}
public void testVersioningNewIndex() {
public void testVersioningNewIndex() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1166,7 +1171,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getVersion(), equalTo(1L));
}
public void testExternalVersioningNewIndex() {
public void testExternalVersioningNewIndex() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
Engine.IndexResult indexResult = engine.index(index);
@ -1177,7 +1182,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getVersion(), equalTo(12L));
}
public void testVersioningIndexConflict() {
public void testVersioningIndexConflict() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1199,7 +1204,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testExternalVersioningIndexConflict() {
public void testExternalVersioningIndexConflict() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
Engine.IndexResult indexResult = engine.index(index);
@ -1242,7 +1247,7 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testVersioningIndexConflictWithFlush() {
public void testVersioningIndexConflictWithFlush() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1266,7 +1271,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testExternalVersioningIndexConflictWithFlush() {
public void testExternalVersioningIndexConflictWithFlush() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
Engine.IndexResult indexResult = engine.index(index);
@ -1361,6 +1366,8 @@ public class InternalEngineTests extends ESTestCase {
}
} catch (AlreadyClosedException ex) {
// fine
} catch (IOException e) {
throw new AssertionError(e);
}
}
};
@ -1379,7 +1386,7 @@ public class InternalEngineTests extends ESTestCase {
}
public void testVersioningDeleteConflict() {
public void testVersioningDeleteConflict() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1412,7 +1419,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningDeleteConflictWithFlush() {
public void testVersioningDeleteConflictWithFlush() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1451,7 +1458,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningCreateExistsException() {
public void testVersioningCreateExistsException() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
Engine.IndexResult indexResult = engine.index(create);
@ -1463,7 +1470,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningCreateExistsExceptionWithFlush() {
public void testVersioningCreateExistsExceptionWithFlush() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
Engine.IndexResult indexResult = engine.index(create);
@ -1477,7 +1484,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningReplicaConflict1() {
public void testVersioningReplicaConflict1() throws IOException {
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
final Engine.Index v1Index = indexForDoc(doc);
final Engine.IndexResult v1Result = engine.index(v1Index);
@ -1526,7 +1533,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L));
}
public void testVersioningReplicaConflict2() {
public void testVersioningReplicaConflict2() throws IOException {
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
final Engine.Index v1Index = indexForDoc(doc);
final Engine.IndexResult v1Result = engine.index(v1Index);
@ -1595,7 +1602,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(replicaV2Result.getVersion(), equalTo(3L));
}
public void testBasicCreatedFlag() {
public void testBasicCreatedFlag() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1612,7 +1619,7 @@ public class InternalEngineTests extends ESTestCase {
assertTrue(indexResult.isCreated());
}
public void testCreatedFlagAfterFlush() {
public void testCreatedFlagAfterFlush() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocument(), B_1, null);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(index);
@ -1654,7 +1661,7 @@ public class InternalEngineTests extends ESTestCase {
// #5891: make sure IndexWriter's infoStream output is
// sent to lucene.iw with log level TRACE:
public void testIndexWriterInfoStream() throws IllegalAccessException {
public void testIndexWriterInfoStream() throws IllegalAccessException, IOException {
assumeFalse("who tests the tester?", VERBOSE);
MockAppender mockAppender = new MockAppender("testIndexWriterInfoStream");
mockAppender.start();
@ -1915,7 +1922,7 @@ public class InternalEngineTests extends ESTestCase {
}
// #8603: make sure we can separately log IFD's messages
public void testIndexWriterIFDInfoStream() throws IllegalAccessException {
public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOException {
assumeFalse("who tests the tester?", VERBOSE);
MockAppender mockAppender = new MockAppender("testIndexWriterIFDInfoStream");
mockAppender.start();
@ -2532,20 +2539,8 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testCheckDocumentFailure() throws Exception {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
Exception documentFailure = engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new IOException("simulated document failure"));
assertThat(documentFailure, instanceOf(IOException.class));
try {
engine.checkIfDocumentFailureOrThrow(indexForDoc(doc), new CorruptIndexException("simulated environment failure", ""));
fail("expected exception to be thrown");
} catch (Exception envirnomentException) {
assertThat(envirnomentException.getMessage(), containsString("simulated environment failure"));
}
}
private static class ThrowingIndexWriter extends IndexWriter {
private AtomicReference<Exception> failureToThrow = new AtomicReference<>();
private AtomicReference<Supplier<Exception>> failureToThrow = new AtomicReference<>();
public ThrowingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
@ -2558,13 +2553,15 @@ public class InternalEngineTests extends ESTestCase {
}
private void maybeThrowFailure() throws IOException {
Exception failure = failureToThrow.get();
if (failure instanceof RuntimeException) {
throw (RuntimeException)failure;
} else if (failure instanceof IOException) {
throw (IOException)failure;
} else {
assert failure == null : "unsupported failure class: " + failure.getClass().getCanonicalName();
if (failureToThrow.get() != null) {
Exception failure = failureToThrow.get().get();
if (failure instanceof RuntimeException) {
throw (RuntimeException) failure;
} else if (failure instanceof IOException) {
throw (IOException) failure;
} else {
assert false: "unsupported failure class: " + failure.getClass().getCanonicalName();
}
}
}
@ -2574,14 +2571,8 @@ public class InternalEngineTests extends ESTestCase {
return super.deleteDocuments(terms);
}
public void setThrowFailure(IOException documentFailure) {
Objects.requireNonNull(documentFailure);
failureToThrow.set(documentFailure);
}
public void setThrowFailure(RuntimeException runtimeFailure) {
Objects.requireNonNull(runtimeFailure);
failureToThrow.set(runtimeFailure);
public void setThrowFailure(Supplier<Exception> failureSupplier) {
failureToThrow.set(failureSupplier);
}
public void clearFailure() {
@ -2594,13 +2585,14 @@ public class InternalEngineTests extends ESTestCase {
final ParsedDocument doc1 = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null);
final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null);
ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig());
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) {
// test document failure while indexing
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(new IOException("simulated"));
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
} else {
throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
}
Engine.IndexResult indexResult = engine.index(indexForDoc(doc1));
assertNotNull(indexResult.getFailure());
@ -2610,29 +2602,33 @@ public class InternalEngineTests extends ESTestCase {
assertNull(indexResult.getFailure());
engine.index(indexForDoc(doc2));
// test document failure while deleting
// test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(new IOException("simulated"));
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
} else {
throwingIndexWriter.setThrowFailure(new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
}
Engine.DeleteResult deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1)));
assertNotNull(deleteResult.getFailure());
// test non document level failure is thrown
if (randomBoolean()) {
// simulate close by corruption
throwingIndexWriter.setThrowFailure(new CorruptIndexException("simulated", "hello"));
try {
if (randomBoolean()) {
engine.index(indexForDoc(doc3));
} else {
engine.delete(new Engine.Delete("test", "2", newUid(doc2)));
}
fail("corruption should throw exceptions");
} catch (Exception e) {
assertThat(e, instanceOf(CorruptIndexException.class));
}
throwingIndexWriter.setThrowFailure(null);
UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> {
Engine.Index index = indexForDoc(doc3);
index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") {
// this is a hack to add a failure during store document which triggers a tragic event
// and in turn fails the engine
@Override
public BytesRef binaryValue() {
throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException());
}
});
engine.index(index);
});
assertTrue(uncheckedIOException.getCause() instanceof MockDirectoryWrapper.FakeIOException);
} else {
// normal close
engine.close();
@ -2807,7 +2803,11 @@ public class InternalEngineTests extends ESTestCase {
}
int docOffset;
while ((docOffset = offset.incrementAndGet()) < docs.size()) {
engine.index(docs.get(docOffset));
try {
engine.index(docs.get(docOffset));
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
thread[i].start();
@ -2867,7 +2867,11 @@ public class InternalEngineTests extends ESTestCase {
}
int docOffset;
while ((docOffset = offset.incrementAndGet()) < docs.size()) {
engine.index(docs.get(docOffset));
try {
engine.index(docs.get(docOffset));
} catch (IOException e) {
throw new AssertionError(e);
}
}
}
};
@ -3055,7 +3059,13 @@ public class InternalEngineTests extends ESTestCase {
final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
skip.set(randomBoolean());
final Thread thread = new Thread(() -> finalInitialEngine.index(indexForDoc(doc)));
final Thread thread = new Thread(() -> {
try {
finalInitialEngine.index(indexForDoc(doc));
} catch (IOException e) {
throw new AssertionError(e);
}
});
thread.start();
if (skip.get()) {
threads.add(thread);

View File

@ -267,7 +267,7 @@ public class ShadowEngineTests extends ESTestCase {
protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
public void testCommitStats() {
public void testCommitStats() throws IOException {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
primaryEngine.index(indexForDoc(doc));
@ -846,7 +846,7 @@ public class ShadowEngineTests extends ESTestCase {
searchResult.close();
}
public void testFailEngineOnCorruption() {
public void testFailEngineOnCorruption() throws IOException {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
primaryEngine.index(indexForDoc(doc));
primaryEngine.flush();

View File

@ -329,7 +329,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception;
protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica);
protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException;
class PrimaryRef implements ReplicationOperation.Primary<Request, ReplicaRequest, PrimaryResult> {
@ -449,7 +449,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
@Override
protected void performOnReplica(IndexRequest request, IndexShard replica) {
protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException {
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
}

View File

@ -320,11 +320,11 @@ public class RefreshListenersTests extends ESTestCase {
refresher.cancel();
}
private Engine.IndexResult index(String id) {
private Engine.IndexResult index(String id) throws IOException {
return index(id, "test");
}
private Engine.IndexResult index(String id, String testFieldValue) {
private Engine.IndexResult index(String id, String testFieldValue) throws IOException {
String type = "test";
String uid = type + ":" + id;
Document document = new Document();

View File

@ -449,11 +449,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id) {
protected Engine.Index indexDoc(IndexShard shard, String type, String id) throws IOException {
return indexDoc(shard, type, id, "{}");
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) {
protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) throws IOException {
final Engine.Index index;
if (shard.routingEntry().primary()) {
index = shard.prepareIndexOnPrimary(
@ -471,7 +471,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
return index;
}
protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) {
protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) throws IOException {
final Engine.Delete delete;
if (shard.routingEntry().primary()) {
delete = shard.prepareDeleteOnPrimary(type, id, Versions.MATCH_ANY, VersionType.INTERNAL);