Always fail engine if delete operation fails (#40117)
Unlike index operations which can fail at the document level to analyzing errors, delete operations should never fail at the document level whether soft-deletes is enabled or not. With this change, we will always fail the engine if we fail to apply a delete operation to Lucene. Closes #33256
This commit is contained in:
parent
d58864745c
commit
a13b4bc8c5
|
@ -1259,18 +1259,8 @@ public class InternalEngine extends Engine {
|
|||
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
|
||||
}
|
||||
}
|
||||
if (delete.origin().isFromTranslog() == false) {
|
||||
final Translog.Location location;
|
||||
if (deleteResult.getResultType() == Result.Type.SUCCESS) {
|
||||
location = translog.add(new Translog.Delete(delete, deleteResult));
|
||||
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
|
||||
final NoOp noOp = new NoOp(deleteResult.getSeqNo(), delete.primaryTerm(), delete.origin(),
|
||||
delete.startTime(), deleteResult.getFailure().toString());
|
||||
location = innerNoOp(noOp).getTranslogLocation();
|
||||
} else {
|
||||
location = null;
|
||||
}
|
||||
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
|
||||
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
|
||||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
localCheckpointTracker.markSeqNoAsCompleted(deleteResult.getSeqNo());
|
||||
|
@ -1278,7 +1268,7 @@ public class InternalEngine extends Engine {
|
|||
deleteResult.freeze();
|
||||
} catch (RuntimeException | IOException e) {
|
||||
try {
|
||||
maybeFailEngine("index", e);
|
||||
maybeFailEngine("delete", e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
}
|
||||
|
@ -1398,12 +1388,9 @@ public class InternalEngine extends Engine {
|
|||
plan.versionOfDeletion, getPrimaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
|
||||
} catch (Exception ex) {
|
||||
if (indexWriter.getTragicException() == null) {
|
||||
// there is no tragic event and such it must be a document level failure
|
||||
return new DeleteResult(
|
||||
ex, plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
|
||||
} else {
|
||||
throw ex;
|
||||
throw new AssertionError("delete operation should never fail at document level", ex);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3259,22 +3259,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertNotNull(indexResult.getTranslogLocation());
|
||||
engine.index(indexForDoc(doc2));
|
||||
|
||||
// test failure while deleting
|
||||
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
|
||||
final Engine.DeleteResult deleteResult;
|
||||
if (randomBoolean()) {
|
||||
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
|
||||
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
|
||||
assertThat(deleteResult.getFailure(), instanceOf(IOException.class));
|
||||
} else {
|
||||
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
|
||||
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
|
||||
assertThat(deleteResult.getFailure(),
|
||||
instanceOf(IllegalArgumentException.class));
|
||||
}
|
||||
assertThat(deleteResult.getVersion(), equalTo(2L));
|
||||
assertThat(deleteResult.getSeqNo(), equalTo(3L));
|
||||
|
||||
// test non document level failure is thrown
|
||||
if (randomBoolean()) {
|
||||
// simulate close by corruption
|
||||
|
@ -3308,6 +3292,40 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDeleteWithFatalError() throws Exception {
|
||||
final IllegalStateException tragicException = new IllegalStateException("fail to store tombstone");
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier = new EngineConfig.TombstoneDocSupplier() {
|
||||
@Override
|
||||
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
|
||||
ParsedDocument parsedDocument = tombstoneDocSupplier().newDeleteTombstoneDoc(type, id);
|
||||
parsedDocument.rootDoc().add(new StoredField("foo", "bar") {
|
||||
// this is a hack to add a failure during store document which triggers a tragic event
|
||||
// and in turn fails the engine
|
||||
@Override
|
||||
public BytesRef binaryValue() {
|
||||
throw tragicException;
|
||||
}
|
||||
});
|
||||
return parsedDocument;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParsedDocument newNoopTombstoneDoc(String reason) {
|
||||
return tombstoneDocSupplier().newNoopTombstoneDoc(reason);
|
||||
}
|
||||
};
|
||||
try (InternalEngine engine = createEngine(null, null, null, config(this.engine.config(), store, tombstoneDocSupplier))) {
|
||||
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
|
||||
engine.index(indexForDoc(doc));
|
||||
expectThrows(IllegalStateException.class,
|
||||
() -> engine.delete(new Engine.Delete("test", "1", newUid("1"), primaryTerm.get())));
|
||||
assertTrue(engine.isClosed.get());
|
||||
assertSame(tragicException, engine.failedEngine.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testDoubleDeliveryPrimary() throws IOException {
|
||||
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
|
||||
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.index.replication;
|
||||
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
@ -58,7 +57,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -418,10 +416,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
*/
|
||||
public void testDocumentFailureReplication() throws Exception {
|
||||
final IOException indexException = new IOException("simulated indexing failure");
|
||||
final IOException deleteException = new IOException("simulated deleting failure");
|
||||
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
|
||||
new IndexWriter(dir, iwc) {
|
||||
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
|
||||
@Override
|
||||
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
|
||||
boolean isTombstone = false;
|
||||
|
@ -430,20 +426,12 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
isTombstone = true;
|
||||
}
|
||||
}
|
||||
if (isTombstone == false && throwAfterIndexedOneDoc.getAndSet(true)) {
|
||||
throw indexException;
|
||||
if (isTombstone) {
|
||||
return super.addDocument(doc); // allow to add Noop
|
||||
} else {
|
||||
return super.addDocument(doc);
|
||||
throw indexException;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public long deleteDocuments(Term... terms) throws IOException {
|
||||
throw deleteException;
|
||||
}
|
||||
@Override
|
||||
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field...fields) throws IOException {
|
||||
throw deleteException; // a delete uses softUpdateDocument API if soft-deletes enabled
|
||||
}
|
||||
}, null, null, config);
|
||||
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
|
||||
@Override
|
||||
|
@ -454,20 +442,13 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
|
||||
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
|
||||
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
|
||||
assertThat(indexResp.isFailed(), equalTo(false));
|
||||
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1));
|
||||
assertThat(indexResp.isFailed(), equalTo(true));
|
||||
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
|
||||
expectedTranslogOps.add(new Translog.NoOp(0, primaryTerm, indexException.toString()));
|
||||
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
|
||||
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
|
||||
}
|
||||
|
||||
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
|
||||
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
|
||||
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
|
||||
|
||||
BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
|
||||
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
|
||||
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
|
||||
shards.assertAllEqual(1);
|
||||
shards.assertAllEqual(0);
|
||||
|
||||
int nReplica = randomIntBetween(1, 3);
|
||||
for (int i = 0; i < nReplica; i++) {
|
||||
|
@ -482,14 +463,10 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
|
||||
}
|
||||
}
|
||||
// unlike previous failures, these two failures replicated directly from the replication channel.
|
||||
// the failure replicated directly from the replication channel.
|
||||
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
|
||||
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
|
||||
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));
|
||||
|
||||
deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
|
||||
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
|
||||
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));
|
||||
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
|
||||
|
||||
for (IndexShard shard : shards) {
|
||||
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
|
||||
|
@ -499,7 +476,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
|
||||
}
|
||||
}
|
||||
shards.assertAllEqual(1);
|
||||
shards.assertAllEqual(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -500,10 +500,10 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
return createEngine(null, null, null, config);
|
||||
}
|
||||
|
||||
private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory,
|
||||
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
|
||||
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
|
||||
EngineConfig config) throws IOException {
|
||||
protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFactory,
|
||||
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
|
||||
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
|
||||
EngineConfig config) throws IOException {
|
||||
final Store store = config.getStore();
|
||||
final Directory directory = store.directory();
|
||||
if (Lucene.indexExists(directory) == false) {
|
||||
|
@ -697,6 +697,19 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
tombstoneDocSupplier());
|
||||
}
|
||||
|
||||
protected EngineConfig config(EngineConfig config, Store store, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier) {
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test",
|
||||
Settings.builder().put(config.getIndexSettings().getSettings())
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
|
||||
return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
|
||||
indexSettings, config.getWarmer(), store, config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
|
||||
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
|
||||
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(),
|
||||
config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(),
|
||||
config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
|
||||
config.getPrimaryTermSupplier(), tombstoneDocSupplier);
|
||||
}
|
||||
|
||||
protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) {
|
||||
return noOpConfig(indexSettings, store, translogPath, null);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue