Remove leniency in reset engine from translog (#44711)

Replaying operations from the local translog must never fail as those
operations were processed successfully on the primary before and the
mapping is up to update already. This change removes leniency during
resetting engine from translog in IndexShard and InternalEngine.
This commit is contained in:
Nhat Nguyen 2019-07-29 14:53:09 -04:00
parent 1a21682ed0
commit 4813728783
5 changed files with 47 additions and 26 deletions

View File

@ -1091,8 +1091,9 @@ public class InternalEngine extends Engine {
* However, we prefer to fail a request individually (instead of a shard) if we hit a document failure on the primary.
*/
private boolean treatDocumentFailureAsTragicError(Index index) {
// TODO: can we enable this all origins except primary on the leader?
return index.origin() == Operation.Origin.REPLICA;
// TODO: can we enable this check for all origins except primary on the leader?
return index.origin() == Operation.Origin.REPLICA
|| index.origin() == Operation.Origin.LOCAL_RESET;
}
/**

View File

@ -1444,7 +1444,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
opsRecovered++;
onOperationRecovered.run();
} catch (Exception e) {
if (ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// TODO: Don't enable this leniency unless users explicitly opt-in
if (origin == Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY && ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {

View File

@ -178,6 +178,7 @@ import java.util.stream.LongStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
@ -5898,7 +5899,7 @@ public class InternalEngineTests extends EngineTestCase {
.collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
}
public void testHandleDocumentFailureOnReplica() throws Exception {
public void testTreatDocumentFailureAsFatalError() throws Exception {
AtomicReference<IOException> addDocException = new AtomicReference<>();
IndexWriterFactory indexWriterFactory = (dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
@ -5913,8 +5914,9 @@ public class InternalEngineTests extends EngineTestCase {
try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
Engine.Operation.Origin origin = randomFrom(REPLICA, LOCAL_RESET);
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
randomNonNegativeLong(), null, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
randomNonNegativeLong(), null, origin, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
addDocException.set(new IOException("simulated"));
expectThrows(IOException.class, () -> engine.index(index));
assertTrue(engine.isClosed.get());

View File

@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -94,6 +95,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
@ -139,7 +141,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -161,6 +162,7 @@ import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -2615,25 +2617,7 @@ public class IndexShardTests extends IndexShardTestCase {
numCorruptEntries++;
}
}
Iterator<Translog.Operation> iterator = operations.iterator();
Translog.Snapshot snapshot = new Translog.Snapshot() {
@Override
public void close() {
}
@Override
public int totalOperations() {
return numTotalEntries;
}
@Override
public Translog.Operation next() throws IOException {
return iterator.hasNext() ? iterator.next() : null;
}
};
Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations);
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
null));
@ -3922,6 +3906,37 @@ public class IndexShardTests extends IndexShardTestCase {
closeShard(shard, false);
}
public void testResetEngineWithBrokenTranslog() throws Exception {
IndexShard shard = newStartedShard(false);
updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData())
.putMapping("_doc", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}").build());
final List<Translog.Operation> operations = Stream.concat(
IntStream.range(0, randomIntBetween(0, 10)).mapToObj(n -> new Translog.Index("_doc", "1", 0, shard.getPendingPrimaryTerm(), 1,
"{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1)),
// entries with corrupted source
IntStream.range(0, randomIntBetween(1, 10)).mapToObj(n -> new Translog.Index("_doc", "1", 0, shard.getPendingPrimaryTerm(), 1,
"{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1))).collect(Collectors.toList());
Randomness.shuffle(operations);
final CountDownLatch engineResetLatch = new CountDownLatch(1);
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L,
ActionListener.wrap(
r -> {
try (Releasable ignored = r) {
Translog.Snapshot snapshot = TestTranslog.newSnapshotFromOperations(operations);
final MapperParsingException error = expectThrows(MapperParsingException.class,
() -> shard.runTranslogRecovery(shard.getEngine(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
assertThat(error.getMessage(), containsString("failed to parse field [foo] of type [text]"));
} finally {
engineResetLatch.countDown();
}
},
e -> {
throw new AssertionError(e);
}), TimeValue.timeValueMinutes(1));
engineResetLatch.await();
closeShards(shard);
}
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -775,8 +776,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected void updateMappings(IndexShard shard, IndexMetaData indexMetadata) {
shard.indexSettings().updateIndexMetaData(indexMetadata);
shard.mapperService().merge(indexMetadata, MapperService.MergeReason.MAPPING_UPDATE);
shard.indexSettings().updateIndexMetaData(
IndexMetaData.builder(indexMetadata).putMapping(new MappingMetaData(shard.mapperService().documentMapper())).build());
}
protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {