Fail engine if hit document failure on replicas (#43523)
An indexing on a replica should never fail after it was successfully indexed on a primary. Hence, we should fail an engine if we hit any failure (document level or tragic failure) when processing an indexing on a replica. Relates #43228 Closes #40435
This commit is contained in:
parent
835b7a120d
commit
2203d447aa
|
@ -933,7 +933,11 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
} catch (RuntimeException | IOException e) {
|
} catch (RuntimeException | IOException e) {
|
||||||
try {
|
try {
|
||||||
maybeFailEngine("index", e);
|
if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
|
||||||
|
failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
|
||||||
|
} else {
|
||||||
|
maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
|
||||||
|
}
|
||||||
} catch (Exception inner) {
|
} catch (Exception inner) {
|
||||||
e.addSuppressed(inner);
|
e.addSuppressed(inner);
|
||||||
}
|
}
|
||||||
|
@ -1059,7 +1063,8 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
|
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
if (indexWriter.getTragicException() == null) {
|
if (ex instanceof AlreadyClosedException == false &&
|
||||||
|
indexWriter.getTragicException() == null && treatDocumentFailureAsTragicError(index) == false) {
|
||||||
/* There is no tragic event recorded so this must be a document failure.
|
/* There is no tragic event recorded so this must be a document failure.
|
||||||
*
|
*
|
||||||
* The handling inside IW doesn't guarantee that an tragic / aborting exception
|
* The handling inside IW doesn't guarantee that an tragic / aborting exception
|
||||||
|
@ -1080,6 +1085,16 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether we should treat any document failure as tragic error.
|
||||||
|
* If we hit any failure while processing an indexing on a replica, we should treat that error as tragic and fail the engine.
|
||||||
|
* However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary.
|
||||||
|
*/
|
||||||
|
private boolean treatDocumentFailureAsTragicError(Index index) {
|
||||||
|
// TODO: can we enable this all origins except primary on the leader?
|
||||||
|
return index.origin() == Operation.Origin.REPLICA;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns true if the indexing operation may have already be processed by this engine.
|
* returns true if the indexing operation may have already be processed by this engine.
|
||||||
* Note that it is OK to rarely return true even if this is not the case. However a `false`
|
* Note that it is OK to rarely return true even if this is not the case. However a `false`
|
||||||
|
|
|
@ -5902,4 +5902,28 @@ public class InternalEngineTests extends EngineTestCase {
|
||||||
.filter(e -> e.getValue() instanceof DeleteVersionValue)
|
.filter(e -> e.getValue() instanceof DeleteVersionValue)
|
||||||
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
|
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHandleDocumentFailureOnReplica() throws Exception {
|
||||||
|
AtomicReference<IOException> addDocException = new AtomicReference<>();
|
||||||
|
IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) {
|
||||||
|
@Override
|
||||||
|
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
|
||||||
|
final IOException ex = addDocException.getAndSet(null);
|
||||||
|
if (ex != null) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
return super.addDocument(doc);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try (Store store = createStore();
|
||||||
|
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
|
||||||
|
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
||||||
|
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
|
||||||
|
randomNonNegativeLong(), null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
|
||||||
|
addDocException.set(new IOException("simulated"));
|
||||||
|
expectThrows(IOException.class, () -> engine.index(index));
|
||||||
|
assertTrue(engine.isClosed.get());
|
||||||
|
assertNotNull(engine.failedEngine.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,6 @@ public class SearchWithRandomExceptionsIT extends ESIntegTestCase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40435")
|
|
||||||
public void testRandomExceptions() throws IOException, InterruptedException, ExecutionException {
|
public void testRandomExceptions() throws IOException, InterruptedException, ExecutionException {
|
||||||
String mapping = Strings.toString(XContentFactory.jsonBuilder().
|
String mapping = Strings.toString(XContentFactory.jsonBuilder().
|
||||||
startObject().
|
startObject().
|
||||||
|
|
Loading…
Reference in New Issue