From a13b4bc8c55bcd36cfa94c1e2b60312cafbd945a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 19 Mar 2019 09:13:27 -0400 Subject: [PATCH] Always fail engine if delete operation fails (#40117) Unlike index operations which can fail at the document level to analyzing errors, delete operations should never fail at the document level whether soft-deletes is enabled or not. With this change, we will always fail the engine if we fail to apply a delete operation to Lucene. Closes #33256 --- .../index/engine/InternalEngine.java | 23 ++------- .../index/engine/InternalEngineTests.java | 50 +++++++++++++------ .../IndexLevelReplicationTests.java | 43 ++++------------ .../index/engine/EngineTestCase.java | 21 ++++++-- 4 files changed, 66 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 65c09f581cc..4791adaf073 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1259,18 +1259,8 @@ public class InternalEngine extends Engine { plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } } - if (delete.origin().isFromTranslog() == false) { - final Translog.Location location; - if (deleteResult.getResultType() == Result.Type.SUCCESS) { - location = translog.add(new Translog.Delete(delete, deleteResult)); - } else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no - final NoOp noOp = new NoOp(deleteResult.getSeqNo(), delete.primaryTerm(), delete.origin(), - delete.startTime(), deleteResult.getFailure().toString()); - location = innerNoOp(noOp).getTranslogLocation(); - } else { - location = null; - } + if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { + final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); } localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo()); @@ -1278,7 +1268,7 @@ public class InternalEngine extends Engine { deleteResult.freeze(); } catch (RuntimeException | IOException e) { try { - maybeFailEngine("index", e); + maybeFailEngine("delete", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -1398,12 +1388,9 @@ public class InternalEngine extends Engine { plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { - // there is no tragic event and such it must be a document level failure - return new DeleteResult( - ex, plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); - } else { - throw ex; + throw new AssertionError("delete operation should never fail at document level", ex); } + throw ex; } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 7f34b07d803..fd9080ba9b5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3259,22 +3259,6 @@ public class InternalEngineTests extends EngineTestCase { assertNotNull(indexResult.getTranslogLocation()); engine.index(indexForDoc(doc2)); - // test failure while deleting - // all these simulated exceptions are not fatal to the IW so we treat them as document failures - final Engine.DeleteResult deleteResult; - if (randomBoolean()) { - throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); - deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get())); - assertThat(deleteResult.getFailure(), instanceOf(IOException.class)); - } else { - throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); - deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get())); - assertThat(deleteResult.getFailure(), - instanceOf(IllegalArgumentException.class)); - } - assertThat(deleteResult.getVersion(), equalTo(2L)); - assertThat(deleteResult.getSeqNo(), equalTo(3L)); - // test non document level failure is thrown if (randomBoolean()) { // simulate close by corruption @@ -3308,6 +3292,40 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testDeleteWithFatalError() throws Exception { + final IllegalStateException tragicException = new IllegalStateException("fail to store tombstone"); + try (Store store = createStore()) { + EngineConfig.TombstoneDocSupplier tombstoneDocSupplier = new EngineConfig.TombstoneDocSupplier() { + @Override + public ParsedDocument newDeleteTombstoneDoc(String type, String id) { + ParsedDocument parsedDocument = tombstoneDocSupplier().newDeleteTombstoneDoc(type, id); + parsedDocument.rootDoc().add(new StoredField("foo", "bar") { + // this is a hack to add a failure during store document which triggers a tragic event + // and in turn fails the engine + @Override + public BytesRef binaryValue() { + throw tragicException; + } + }); + return parsedDocument; + } + + @Override + public ParsedDocument newNoopTombstoneDoc(String reason) { + return tombstoneDocSupplier().newNoopTombstoneDoc(reason); + } + }; + try (InternalEngine engine = createEngine(null, null, null, config(this.engine.config(), store, tombstoneDocSupplier))) { + final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc)); + expectThrows(IllegalStateException.class, + () -> engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get()))); + assertTrue(engine.isClosed.get()); + assertSame(tragicException, engine.failedEngine.get()); + } + } + } + public void testDoubleDeliveryPrimary() throws IOException { final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index b2e08f8f704..657bbac734b 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.index.replication; -import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; @@ -58,7 +57,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -418,10 +416,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase */ public void testDocumentFailureReplication() throws Exception { final IOException indexException = new IOException("simulated indexing failure"); - final IOException deleteException = new IOException("simulated deleting failure"); final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) -> new IndexWriter(dir, iwc) { - final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW. @Override public long addDocument(Iterable doc) throws IOException { boolean isTombstone = false; @@ -430,20 +426,12 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase isTombstone = true; } } - if (isTombstone == false && throwAfterIndexedOneDoc.getAndSet(true)) { - throw indexException; + if (isTombstone) { + return super.addDocument(doc); // allow to add Noop } else { - return super.addDocument(doc); + throw indexException; } } - @Override - public long deleteDocuments(Term... terms) throws IOException { - throw deleteException; - } - @Override - public long softUpdateDocument(Term term, Iterable doc, Field...fields) throws IOException { - throw deleteException; // a delete uses softUpdateDocument API if soft-deletes enabled - } }, null, null, config); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override @@ -454,20 +442,13 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase long primaryTerm = shards.getPrimary().getPendingPrimaryTerm(); List expectedTranslogOps = new ArrayList<>(); BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON)); - assertThat(indexResp.isFailed(), equalTo(false)); - expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1)); + assertThat(indexResp.isFailed(), equalTo(true)); + assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); + expectedTranslogOps.add(new Translog.NoOp(0, primaryTerm, indexException.toString())); try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } - - indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); - assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); - expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString())); - - BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); - assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); - expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString())); - shards.assertAllEqual(1); + shards.assertAllEqual(0); int nReplica = randomIntBetween(1, 3); for (int i = 0; i < nReplica; i++) { @@ -482,14 +463,10 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } - // unlike previous failures, these two failures replicated directly from the replication channel. + // the failure replicated directly from the replication channel. indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); - expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString())); - - deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1")); - assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException)); - expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString())); + expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString())); for (IndexShard shard : shards) { try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) { @@ -499,7 +476,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } - shards.assertAllEqual(1); + shards.assertAllEqual(0); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f6731f1e2ad..a483e79467b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -500,10 +500,10 @@ public abstract class EngineTestCase extends ESTestCase { return createEngine(null, null, null, config); } - private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory, - @Nullable BiFunction localCheckpointTrackerSupplier, - @Nullable ToLongBiFunction seqNoForOperation, - EngineConfig config) throws IOException { + protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction localCheckpointTrackerSupplier, + @Nullable ToLongBiFunction seqNoForOperation, + EngineConfig config) throws IOException { final Store store = config.getStore(); final Directory directory = store.directory(); if (Lucene.indexExists(directory) == false) { @@ -697,6 +697,19 @@ public abstract class EngineTestCase extends ESTestCase { tombstoneDocSupplier()); } + protected EngineConfig config(EngineConfig config, Store store, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier) { + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", + Settings.builder().put(config.getIndexSettings().getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build()); + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), + config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), tombstoneDocSupplier); + } + protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { return noOpConfig(indexSettings, store, translogPath, null); }