From 4813728783c2a8ddfa2c2b543fb4006cb6a8704e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Jul 2019 14:53:09 -0400 Subject: [PATCH] Remove leniency in reset engine from translog (#44711) Replaying operations from the local translog must never fail as those operations were processed successfully on the primary before and the mapping is up to update already. This change removes leniency during resetting engine from translog in IndexShard and InternalEngine. --- .../index/engine/InternalEngine.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 3 +- .../index/engine/InternalEngineTests.java | 6 +- .../index/shard/IndexShardTests.java | 55 ++++++++++++------- .../index/shard/IndexShardTestCase.java | 4 +- 5 files changed, 47 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aebeff4a449..b19d0660640 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1091,8 +1091,9 @@ public class InternalEngine extends Engine { * However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary. */ private boolean treatDocumentFailureAsTragicError(Index index) { - // TODO: can we enable this all origins except primary on the leader? - return index.origin() == Operation.Origin.REPLICA; + // TODO: can we enable this check for all origins except primary on the leader? + return index.origin() == Operation.Origin.REPLICA + || index.origin() == Operation.Origin.LOCAL_RESET; } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f9e3bb45eaf..a94e92f9950 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1444,7 +1444,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl opsRecovered++; onOperationRecovered.run(); } catch (Exception e) { - if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { + // TODO: Don't enable this leniency unless users explicitly opt-in + if (origin == Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY && ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent logger.info("ignoring recovery of a corrupt translog entry", e); } else { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0df178f924e..af249c7a420 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -178,6 +178,7 @@ import java.util.stream.LongStream; import static java.util.Collections.emptyMap; import static java.util.Collections.shuffle; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; @@ -5898,7 +5899,7 @@ public class InternalEngineTests extends EngineTestCase { .collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue())); } - public void testHandleDocumentFailureOnReplica() throws Exception { + public void testTreatDocumentFailureAsFatalError() throws Exception { AtomicReference addDocException = new AtomicReference<>(); IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) { @Override @@ -5913,8 +5914,9 @@ public class InternalEngineTests extends EngineTestCase { try (Store store = createStore(); InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) { final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); + Engine.Operation.Origin origin = randomFrom(REPLICA, LOCAL_RESET); Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(), - randomNonNegativeLong(), null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + randomNonNegativeLong(), null, origin, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); addDocException.set(new IOException("simulated")); expectThrows(IOException.class, () -> engine.index(index)); assertTrue(engine.isClosed.get()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bc015ddf795..3b3e11f9cef 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -94,6 +95,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -139,7 +141,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -161,6 +162,7 @@ import java.util.function.LongFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -2615,25 +2617,7 @@ public class IndexShardTests extends IndexShardTestCase { numCorruptEntries++; } } - - Iterator iterator = operations.iterator(); - Translog.Snapshot snapshot = new Translog.Snapshot() { - - @Override - public void close() { - - } - - @Override - public int totalOperations() { - return numTotalEntries; - } - - @Override - public Translog.Operation next() throws IOException { - return iterator.hasNext() ? iterator.next() : null; - } - }; + Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations); primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), getFakeDiscoNode(primary.routingEntry().currentNodeId()), null)); @@ -3922,6 +3906,37 @@ public class IndexShardTests extends IndexShardTestCase { closeShard(shard, false); } + public void testResetEngineWithBrokenTranslog() throws Exception { + IndexShard shard = newStartedShard(false); + updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData()) + .putMapping("_doc", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}").build()); + final List operations = Stream.concat( + IntStream.range(0, randomIntBetween(0, 10)).mapToObj(n -> new Translog.Index("_doc", "1", 0, shard.getPendingPrimaryTerm(), 1, + "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1)), + // entries with corrupted source + IntStream.range(0, randomIntBetween(1, 10)).mapToObj(n -> new Translog.Index("_doc", "1", 0, shard.getPendingPrimaryTerm(), 1, + "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1))).collect(Collectors.toList()); + Randomness.shuffle(operations); + final CountDownLatch engineResetLatch = new CountDownLatch(1); + shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L, + ActionListener.wrap( + r -> { + try (Releasable ignored = r) { + Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations); + final MapperParsingException error = expectThrows(MapperParsingException.class, + () -> shard.runTranslogRecovery(shard.getEngine(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); + assertThat(error.getMessage(), containsString("failed to parse field [foo] of type [text]")); + } finally { + engineResetLatch.countDown(); + } + }, + e -> { + throw new AssertionError(e); + }), TimeValue.timeValueMinutes(1)); + engineResetLatch.await(); + closeShards(shard); + } + public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { final IndexShard replica = newStartedShard(false); indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index e36f5e39990..d7c69d69595 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -775,8 +776,9 @@ public abstract class IndexShardTestCase extends ESTestCase { } protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) { - shard.indexSettings().updateIndexMetaData(indexMetadata); shard.mapperService().merge(indexMetadata, MapperService.MergeReason.MAPPING_UPDATE); + shard.indexSettings().updateIndexMetaData( + IndexMetaData.builder(indexMetadata).putMapping(new MappingMetaData(shard.mapperService().documentMapper())).build()); } protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {