Handle delete document level failures (#46100)

Today we assume that document failures can not occur for deletes. This
assumption is bogus, as they can fail for a variety of reasons such as
the Lucene index having reached the document limit. Because of this
assumption, we were asserting that such a document-level failure would
never happen. When this bogus assertion is violated, we fail the node, a
catastrophe. Instead, we need to treat this as a fatal engine exception.
This commit is contained in:
Jason Tedor 2019-08-28 22:16:15 -04:00
parent 70507e1041
commit 9bc4a24118
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
2 changed files with 68 additions and 7 deletions

View File

@ -103,6 +103,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -1410,9 +1411,19 @@ public class InternalEngine extends Engine {
}
return new DeleteResult(
plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (Exception ex) {
} catch (final Exception ex) {
/*
* Document level failures when deleting are unexpected, we likely hit something fatal such as the Lucene index being corrupt,
* or the Lucene document limit. We have already issued a sequence number here so this is fatal, fail the engine.
*/
if (ex instanceof AlreadyClosedException == false && indexWriter.getTragicException() == null) {
throw new AssertionError("delete operation should never fail at document level", ex);
final String reason = String.format(
Locale.ROOT,
"delete id[%s] origin [%s] seq#[%d] failed at the document level",
delete.id(),
delete.origin(),
delete.seqNo());
failEngine(reason, ex);
}
throw ex;
}

View File

@ -81,6 +81,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
@ -5982,11 +5983,16 @@ public class InternalEngineTests extends EngineTestCase {
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
try (Store store = createStore();
Engine engine = createEngine((dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
throw new IllegalArgumentException("fatal");
}
}, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
},
null,
null,
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
final Engine.NoOp op = new Engine.NoOp(0, 0, PRIMARY, System.currentTimeMillis(), "test");
final IllegalArgumentException e = expectThrows(IllegalArgumentException. class, () -> engine.noOp(op));
assertThat(e.getMessage(), equalTo("fatal"));
@ -5997,4 +6003,48 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testDeleteFailureSoftDeletesEnabledDocAlreadyDeleted() throws IOException {
runTestDeleteFailure(true, InternalEngine::delete);
}
public void testDeleteFailureSoftDeletesEnabled() throws IOException {
runTestDeleteFailure(true, (engine, op) -> {});
}
public void testDeleteFailureSoftDeletesDisabled() throws IOException {
runTestDeleteFailure(false, (engine, op) -> {});
}
private void runTestDeleteFailure(
final boolean softDeletesEnabled,
final CheckedBiConsumer<InternalEngine, Engine.Delete, IOException> consumer) throws IOException {
engine.close();
final Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), softDeletesEnabled).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
final AtomicReference<ThrowingIndexWriter> iw = new AtomicReference<>();
try (Store store = createStore();
InternalEngine engine = createEngine(
(dir, iwc) -> {
iw.set(new ThrowingIndexWriter(dir, iwc));
return iw.get();
},
null,
null,
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", null)));
final Engine.Delete op = new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get());
consumer.accept(engine, op);
iw.get().setThrowFailure(() -> new IllegalArgumentException("fatal"));
final IllegalArgumentException e = expectThrows(IllegalArgumentException. class, () -> engine.delete(op));
assertThat(e.getMessage(), equalTo("fatal"));
assertTrue(engine.isClosed.get());
assertThat(engine.failedEngine.get(), not(nullValue()));
assertThat(engine.failedEngine.get(), instanceOf(IllegalArgumentException.class));
assertThat(engine.failedEngine.get().getMessage(), equalTo("fatal"));
}
}
}