Move EvilPeerRecoveryIT to a unit test in RecoveryDuringReplicationTests (#22900)

EvillPeerRecoveryIT checks scenario where recovery is happening while there are on going indexing operation that already have been assigned a seq# . This is fairly hard to achieve and the test goes through a couple of hoops via the plugin infra to achieve that. This PR extends the unit tests infra to allow for those hoops to happen in unit tests. This allows the test to be moved to RecoveryDuringReplicationTests

Relates to #22484
This commit is contained in:
Boaz Leskes 2017-02-09 20:14:03 +02:00 committed by GitHub
parent 94087b3274
commit cd1cb41603
7 changed files with 239 additions and 308 deletions

View File

@ -87,7 +87,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
public class InternalEngine extends Engine {
@ -1291,37 +1290,46 @@ public class InternalEngine extends Engine {
}
}
// pkg-private for testing
IndexWriter createWriter(boolean create) throws IOException {
private IndexWriter createWriter(boolean create) throws IOException {
try {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return new IndexWriter(store.directory(), iwc);
final IndexWriterConfig iwc = getIndexWriterConfig(create);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
throw ex;
}
}
// pkg-private for testing
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return new IndexWriter(directory, iwc);
}
private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Exception ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
mergePolicy = new ElasticsearchMergePolicy(mergePolicy);
iwc.setMergePolicy(mergePolicy);
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
return iwc;
}
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;

View File

@ -34,7 +34,6 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
@ -129,7 +128,6 @@ import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.OldIndexUtils;
import org.elasticsearch.test.rest.yaml.section.Assertion;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
@ -138,7 +136,6 @@ import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
@ -152,7 +149,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
@ -333,8 +329,9 @@ public class InternalEngineTests extends ESTestCase {
return createEngine(indexSettings, store, translogPath, mergePolicy, null);
}
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, Supplier<IndexWriter> indexWriterSupplier) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterSupplier, null);
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
@Nullable IndexWriterFactory indexWriterFactory) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null);
}
protected InternalEngine createEngine(
@ -342,26 +339,40 @@ public class InternalEngineTests extends ESTestCase {
Store store,
Path translogPath,
MergePolicy mergePolicy,
Supplier<IndexWriter> indexWriterSupplier,
Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
InternalEngine internalEngine = new InternalEngine(config) {
@Override
IndexWriter createWriter(boolean create) throws IOException {
return (indexWriterSupplier != null) ? indexWriterSupplier.get() : super.createWriter(create);
}
@Override
public SequenceNumbersService seqNoService() {
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService();
}
};
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
return internalEngine;
}
@FunctionalInterface
public interface IndexWriterFactory {
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException;
}
public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
@Nullable final Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier,
final EngineConfig config) {
return new InternalEngine(config) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
indexWriterFactory.createWriter(directory, iwc) :
super.createWriter(directory, iwc);
}
@Override
public SequenceNumbersService seqNoService() {
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.get() : super.seqNoService();
}
};
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
@ -2589,18 +2600,23 @@ public class InternalEngineTests extends ESTestCase {
final ParsedDocument doc2 = testParsedDocument("2", "test", null, testDocumentWithTextField(), B_1, null);
final ParsedDocument doc3 = testParsedDocument("3", "test", null, testDocumentWithTextField(), B_1, null);
ThrowingIndexWriter throwingIndexWriter = new ThrowingIndexWriter(store.directory(), new IndexWriterConfig());
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, () -> throwingIndexWriter)) {
AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
(directory, iwc) -> {
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get();
})
) {
// test document failure while indexing
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
} else {
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
}
Engine.IndexResult indexResult = engine.index(indexForDoc(doc1));
assertNotNull(indexResult.getFailure());
throwingIndexWriter.clearFailure();
throwingIndexWriter.get().clearFailure();
indexResult = engine.index(indexForDoc(doc1));
assertNull(indexResult.getFailure());
engine.index(indexForDoc(doc2));
@ -2608,17 +2624,17 @@ public class InternalEngineTests extends ESTestCase {
// test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
if (randomBoolean()) {
throwingIndexWriter.setThrowFailure(() -> new IOException("simulated"));
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
expectThrows(IOException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
} else {
throwingIndexWriter.setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
expectThrows(IllegalArgumentException.class, () -> engine.delete(new Engine.Delete("test", "1", newUid(doc1))));
}
// test non document level failure is thrown
if (randomBoolean()) {
// simulate close by corruption
throwingIndexWriter.setThrowFailure(null);
throwingIndexWriter.get().setThrowFailure(null);
UncheckedIOException uncheckedIOException = expectThrows(UncheckedIOException.class, () -> {
Engine.Index index = indexForDoc(doc3);
index.parsedDoc().rootDoc().add(new StoredField("foo", "bar") {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
@ -82,6 +83,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
protected ReplicationGroup createGroup(int replicas) throws IOException {
IndexMetaData metaData = buildIndexMetaData(replicas);
return new ReplicationGroup(metaData);
}
protected IndexMetaData buildIndexMetaData(int replicas) throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -92,7 +98,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
for (Map.Entry<String, String> typeMapping : indexMapping.entrySet()) {
metaData.putMapping(typeMapping.getKey(), typeMapping.getValue());
}
return new ReplicationGroup(metaData.build());
return metaData.build();
}
protected DiscoveryNode getDiscoveryNode(String id) {
@ -109,7 +115,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
boolean closed = false;
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
primary = newShard(shardId, true, "s0", indexMetaData, this::syncGlobalCheckpoint, null);
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
primary = newShard(primaryRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(primaryRouting));
replicas = new ArrayList<>();
this.indexMetaData = indexMetaData;
updateAllocationIDsOnPrimary();
@ -118,6 +125,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
}
private ShardRouting createShardRouting(String nodeId, boolean primary) {
return TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
}
protected EngineFactory getEngineFactory(ShardRouting routing) {
return null;
}
public int indexDocs(final int numOfDoc) throws Exception {
for (int doc = 0; doc < numOfDoc; doc++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet()))
@ -175,8 +191,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
public synchronized IndexShard addReplica() throws IOException {
final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false);
final IndexShard replica =
newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, this::syncGlobalCheckpoint, null);
newShard(replicaRouting, indexMetaData, null, this::syncGlobalCheckpoint, getEngineFactory(replicaRouting));
replicas.add(replica);
updateAllocationIDsOnPrimary();
return replica;
@ -189,7 +206,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
false, ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE);
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, this::syncGlobalCheckpoint);
final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null,
this::syncGlobalCheckpoint, getEngineFactory(shardRouting));
replicas.add(newReplica);
updateAllocationIDsOnPrimary();
return newReplica;
@ -531,13 +549,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected PrimaryResult performOnPrimary(IndexShard primary,
GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception {
primary.getTranslog().sync();
return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()),
new ReplicationResponse());
}
@Override
protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) {
protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) throws IOException {
replica.updateGlobalCheckpointOnReplica(request.getCheckpoint());
replica.getTranslog().sync();
}
}

View File

@ -20,10 +20,18 @@
package org.elasticsearch.index.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -37,6 +45,9 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@ -191,6 +202,121 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
}
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," +
"org.elasticsearch.discovery:TRACE," +
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE," +
"org.elasticsearch.index.seqno:TRACE"
)
public void testWaitForPendingSeqNo() throws Exception {
IndexMetaData metaData = buildIndexMetaData(1);
final int pendingDocs = randomIntBetween(1, 5);
final AtomicReference<Semaphore> blockIndexingOnPrimary = new AtomicReference<>();
final CountDownLatch blockedIndexers = new CountDownLatch(pendingDocs);
try (ReplicationGroup shards = new ReplicationGroup(metaData) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
if (routing.primary()) {
return new EngineFactory() {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
Semaphore block = blockIndexingOnPrimary.get();
if (block != null) {
blockedIndexers.countDown();
try {
block.acquire();
} catch (InterruptedException e) {
throw new AssertionError("unexpectedly interrupted", e);
}
}
return super.addDocument(doc);
}
}, null, config);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
throw new UnsupportedOperationException();
}
};
} else {
return null;
}
}
}) {
shards.startAll();
int docs = shards.indexDocs(randomIntBetween(1,10));
IndexShard replica = shards.getReplicas().get(0);
shards.removeReplica(replica);
closeShards(replica);
docs += pendingDocs;
final Semaphore pendingDocsSemaphore = new Semaphore(pendingDocs);
blockIndexingOnPrimary.set(pendingDocsSemaphore);
blockIndexingOnPrimary.get().acquire(pendingDocs);
CountDownLatch pendingDocsDone = new CountDownLatch(pendingDocs);
for (int i = 0; i < pendingDocs; i++) {
final String id = "pending_" + i;
threadPool.generic().submit(() -> {
try {
shards.index(new IndexRequest(index.getName(), "type", id).source("{}"));
} catch (Exception e) {
throw new AssertionError(e);
} finally {
pendingDocsDone.countDown();
}
});
}
// wait for the pending ops to "hang"
blockedIndexers.await();
blockIndexingOnPrimary.set(null);
// index some more
docs += shards.indexDocs(randomInt(5));
IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
CountDownLatch recoveryStart = new CountDownLatch(1);
AtomicBoolean preparedForTranslog = new AtomicBoolean(false);
final Future<Void> recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
preparedForTranslog.set(true);
super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp);
}
};
});
recoveryStart.await();
for (int i = 0; i < pendingDocs; i++) {
assertFalse((pendingDocs - i) + " pending operations, recovery should wait", preparedForTranslog.get());
pendingDocsSemaphore.release();
}
pendingDocsDone.await();
// now recovery can finish
recoveryFuture.get();
assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty());
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(docs));
shards.assertAllEqual(docs);
}
}
private static class BlockingTarget extends RecoveryTarget {
private final CountDownLatch recoveryBlocked;

View File

@ -1061,7 +1061,7 @@ public class IndexShardTests extends IndexShardTestCase {
};
closeShards(shard);
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {});
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}, null);
recoveryShardFromStore(newShard);
@ -1202,7 +1202,7 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {});
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, () -> {}, null);
recoveryShardFromStore(newShard);

View File

@ -1,242 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Tokenizer;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalyzerProvider;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class EvilPeerRecoveryIT extends ESIntegTestCase {
private static AtomicReference<CountDownLatch> indexLatch = new AtomicReference<>();
private static AtomicReference<CountDownLatch> waitForOpsToCompleteLatch = new AtomicReference<>();
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(LatchAnalysisPlugin.class);
}
public static class LatchAnalysisPlugin extends Plugin implements AnalysisPlugin {
@Override
public Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> getAnalyzers() {
return Collections.singletonMap("latch_analyzer", (a, b, c, d) -> new LatchAnalyzerProvider());
}
}
static class LatchAnalyzerProvider implements AnalyzerProvider<LatchAnalyzer> {
@Override
public String name() {
return "latch_analyzer";
}
@Override
public AnalyzerScope scope() {
return AnalyzerScope.INDICES;
}
@Override
public LatchAnalyzer get() {
return new LatchAnalyzer();
}
}
static class LatchAnalyzer extends Analyzer {
@Override
protected TokenStreamComponents createComponents(final String fieldName) {
return new TokenStreamComponents(new LatchTokenizer());
}
}
static class LatchTokenizer extends Tokenizer {
@Override
public final boolean incrementToken() throws IOException {
try {
if (indexLatch.get() != null) {
// latch that all exected operations are in the engine
indexLatch.get().countDown();
}
if (waitForOpsToCompleteLatch.get() != null) {
// latch that waits for the replica to restart and allows recovery to proceed
waitForOpsToCompleteLatch.get().await();
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
}
/*
* This tests that sequence-number-based recoveries wait for in-flight operations to complete. The trick here is simple. We latch some
* in-flight operations inside the engine after sequence numbers are assigned. While these operations are latched, we restart a replica.
* Sequence-number-based recovery on this replica has to wait until these in-flight operations complete to proceed. We verify at the end
* of recovery that a file-based recovery was not completed, and that the expected number of operations was replayed via the translog.
*/
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE," +
"org.elasticsearch.discovery:TRACE," +
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
@AwaitsFix(bugUrl =
"boaz is looking into failures: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+java9-periodic/1545")
public void testRecoveryWaitsForOps() throws Exception {
final int docs = randomIntBetween(1, 64);
try {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(nodeSettings(0));
// prepare mapping that uses our latch analyzer
final XContentBuilder mapping = jsonBuilder();
mapping.startObject();
{
mapping.startObject("type");
{
mapping.startObject("properties");
{
mapping.startObject("foo");
{
mapping.field("type", "text");
mapping.field("analyzer", "latch_analyzer");
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
// create the index with our mapping
client()
.admin()
.indices()
.prepareCreate("index")
.addMapping("type", mapping)
.setSettings(Settings.builder().put("number_of_shards", 1))
.get();
// start the replica node; we do this after creating the index so we can control which node is holds the primary shard
final String replicaNode = internalCluster().startDataOnlyNode(nodeSettings(1));
ensureGreen();
// index some documents so that the replica will attempt a sequence-number-based recovery upon restart
for (int foo = 0; foo < docs; foo++) {
index(randomFrom(primaryNode, replicaNode), foo);
}
if (randomBoolean()) {
client().admin().indices().flush(new FlushRequest()).get();
}
// start some in-flight operations that will get latched in the engine
final List<Thread> threads = new ArrayList<>();
final int latchedDocs = internalCluster().getInstance(ThreadPool.class, replicaNode).info(ThreadPool.Names.BULK).getMax();
indexLatch.set(new CountDownLatch(latchedDocs));
waitForOpsToCompleteLatch.set(new CountDownLatch(1));
for (int i = docs; i < docs + latchedDocs; i++) {
final int foo = i;
// we have to index through the primary since we are going to restart the replica
final Thread thread = new Thread(() -> index(primaryNode, foo));
threads.add(thread);
thread.start();
}
// latch until all operations are inside the engine
indexLatch.get().await();
internalCluster().restartNode(replicaNode, new InternalTestCluster.RestartCallback());
final Index index = resolveIndex("index");
// wait until recovery starts
assertBusy(() -> {
final IndicesService primaryService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertThat(primaryService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
final IndicesService replicaService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertThat(replicaService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
}
);
// unlatch the operations that are latched inside the engine
waitForOpsToCompleteLatch.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
// recovery should complete successfully
ensureGreen();
// verify that a sequence-number-based recovery was completed
final org.elasticsearch.action.admin.indices.recovery.RecoveryResponse response =
client().admin().indices().prepareRecoveries("index").get();
final List<RecoveryState> states = response.shardRecoveryStates().get("index");
for (final RecoveryState state : states) {
if (state.getTargetNode().getName().equals(replicaNode)) {
assertThat(state.getTranslog().recoveredOperations(), equalTo(latchedDocs));
assertThat(state.getIndex().recoveredFilesPercent(), equalTo(0f));
}
}
} finally {
internalCluster().close();
}
}
private void index(final String node, final int foo) {
client(node).prepareIndex("index", "type").setSource("{\"foo\":\"" + Integer.toString(foo) + "\"}").get();
}
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
@ -190,7 +191,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, () -> {});
return newShard(shardRouting, indexMetaData, searcherWrapper, () -> {}, null);
}
/**
@ -206,7 +207,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
@Nullable IndexSearcherWrapper searcherWrapper) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, globalCheckpointSyncer);
return newShard(shardRouting, indexMetaData, searcherWrapper, globalCheckpointSyncer, null);
}
@ -220,7 +221,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, () -> {}, listeners);
return newShard(routing, indexMetaData, null, () -> {}, null, listeners);
}
/**
@ -235,13 +236,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper, Runnable globalCheckpointSyncer,
@Nullable EngineFactory engineFactory,
IndexingOperationListener... listeners)
throws IOException {
// add node id as name to settings for popper logging
final ShardId shardId = routing.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, globalCheckpointSyncer, listeners);
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, globalCheckpointSyncer, engineFactory, listeners);
}
/**
@ -256,6 +258,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
Runnable globalCheckpointSyncer,
@Nullable EngineFactory engineFactory,
IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
@ -277,8 +280,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
IndexFieldDataService indexFieldDataService = new IndexFieldDataService(indexSettings, indicesFieldDataCache,
new NoneCircuitBreakerService(), mapperService);
indexShard = new IndexShard(routing, indexSettings, shardPath, store, indexCache, mapperService, similarityService,
indexFieldDataService, null, indexEventListener, indexSearcherWrapper, threadPool, BigArrays.NON_RECYCLING_INSTANCE, warmer,
globalCheckpointSyncer, Collections.emptyList(), Arrays.asList(listeners));
indexFieldDataService, engineFactory, indexEventListener, indexSearcherWrapper, threadPool,
BigArrays.NON_RECYCLING_INSTANCE, warmer, globalCheckpointSyncer, Collections.emptyList(), Arrays.asList(listeners));
success = true;
} finally {
if (success == false) {
@ -309,7 +312,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException {
closeShards(current);
return newShard(routing, current.shardPath(), current.indexSettings().getIndexMetaData(), null,
current.getGlobalCheckpointSyncer(), listeners);
current.getGlobalCheckpointSyncer(), current.engineFactory, listeners);
}
/**