Remove leniency during replay translog in peer recovery (#44989)

This change removes leniency in InternalEngine during replaying translog
in peer recovery.
This commit is contained in:
Nhat Nguyen 2019-07-30 10:15:37 -04:00
parent 41a99c9e4a
commit 979d0a71c7
4 changed files with 64 additions and 6 deletions

View File

@ -1093,6 +1093,7 @@ public class InternalEngine extends Engine {
private boolean treatDocumentFailureAsTragicError(Index index) { private boolean treatDocumentFailureAsTragicError(Index index) {
// TODO: can we enable this check for all origins except primary on the leader? // TODO: can we enable this check for all origins except primary on the leader?
return index.origin() == Operation.Origin.REPLICA return index.origin() == Operation.Origin.REPLICA
|| index.origin() == Operation.Origin.PEER_RECOVERY
|| index.origin() == Operation.Origin.LOCAL_RESET; || index.origin() == Operation.Origin.LOCAL_RESET;
} }

View File

@ -5914,7 +5914,7 @@ public class InternalEngineTests extends EngineTestCase {
try (Store store = createStore(); try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) { InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, indexWriterFactory)) {
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
Engine.Operation.Origin origin = randomFrom(REPLICA, LOCAL_RESET); Engine.Operation.Origin origin = randomFrom(REPLICA, LOCAL_RESET, PEER_RECOVERY);
Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(), Engine.Index index = new Engine.Index(newUid(doc), doc, randomNonNegativeLong(), primaryTerm.get(),
randomNonNegativeLong(), null, origin, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); randomNonNegativeLong(), null, origin, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
addDocException.set(new IOException("simulated")); addDocException.set(new IOException("simulated"));

View File

@ -24,11 +24,15 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
@ -38,6 +42,9 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
@ -47,11 +54,13 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -405,4 +414,48 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
shards.assertAllEqual(numDocs); shards.assertAllEqual(numDocs);
} }
} }
public void testFailsToIndexDuringPeerRecovery() throws Exception {
AtomicReference<IOException> throwExceptionDuringIndexing = new AtomicReference<>(new IOException("simulated"));
try (ReplicationGroup group = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
if (routing.primary()) {
return new InternalEngineFactory();
} else {
return config -> InternalEngineTests.createInternalEngine((dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
final IOException error = throwExceptionDuringIndexing.getAndSet(null);
if (error != null) {
throw error;
}
return super.addDocument(doc);
}
}, null, null, config);
}
}
}) {
group.startAll();
group.indexDocs(randomIntBetween(1, 10));
allowShardFailures();
IndexShard replica = group.addReplica();
expectThrows(Exception.class, () -> group.recoverReplica(replica,
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
throw new AssertionError("recovery must fail");
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
})));
expectThrows(AlreadyClosedException.class, () -> replica.refresh("test"));
group.removeReplica(replica);
replica.store().close();
closeShards(replica);
}
}
} }

View File

@ -640,11 +640,15 @@ public abstract class IndexShardTestCase extends ESTestCase {
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable); currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable);
try {
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>(); PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
recovery.recoverToTarget(future); recovery.recoverToTarget(future);
future.actionGet(); future.actionGet();
recoveryTarget.markAsDone(); recoveryTarget.markAsDone();
} catch (Exception e) {
recoveryTarget.fail(new RecoveryFailedException(request, e), false);
throw e;
}
} }
protected void startReplicaAfterRecovery(IndexShard replica, IndexShard primary, Set<String> inSyncIds, protected void startReplicaAfterRecovery(IndexShard replica, IndexShard primary, Set<String> inSyncIds,