Test for deadlock when relocating and indexing concurrently

see #18553

Reproduces with

gradle :core:integTest -Dtests.class=org.elasticsearch.recovery.RelocationIT -Dtests.method="testIndexAndRelocateConcurrently"  -Dtests.iters=20 -Dtests.failfast=true -Dtests.logger.level=DEBUG -Dtests.seed=9BE7064B819064FA:C44B70F31707D081 -Dtests.timeoutSuite=60000!
This commit is contained in:
Britta Weber 2016-05-24 19:59:50 +02:00
parent bef1c8511d
commit b3a8c54928

View File

@ -22,6 +22,7 @@ package org.elasticsearch.recovery;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.English;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
@ -52,6 +53,7 @@ import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@ -71,12 +73,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
@ -428,6 +433,62 @@ public class RelocationIT extends ESIntegTestCase {
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/18553")
public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException {
Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build();
InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(blueSetting, blueSetting);
Settings redSetting = Settings.builder().put("node.attr.color", "red").build();
InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(redSetting, redSetting);
blueFuture.get();
redFuture.get();
logger.info("blue nodes: {}", blueFuture.get());
logger.info("red nodes: {}", redFuture.get());
ensureStableCluster(4);
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.routing.allocation.exclude.color", "blue")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(indexSettings())));
ensureYellow();
assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
int numDocs = randomIntBetween(100, 150);
ArrayList<String> ids = new ArrayList<>();
logger.info(" --> indexing [{}] docs", numDocs);
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
String id = randomRealisticUnicodeOfLength(10) + String.valueOf(i);
ids.add(id);
docs[i] = client().prepareIndex("test", "type1", id).setSource("field1", English.intToEnglish(i));
}
indexRandom(true, docs);
SearchResponse countResponse = client().prepareSearch("test").get();
assertHitCount(countResponse, numDocs);
logger.info(" --> moving index to new nodes");
Settings build = Settings.builder().put("index.routing.allocation.exclude.color", "red")
.put("index.routing.allocation.include.color", "blue").build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).execute().actionGet();
// index while relocating
logger.info(" --> indexing [{}] more docs", numDocs);
for (int i = 0; i < numDocs; i++) {
String id = randomRealisticUnicodeOfLength(10) + String.valueOf(numDocs + i);
ids.add(id);
docs[i] = client().prepareIndex("test", "type1", id).setSource("field1", English.intToEnglish(numDocs + i));
}
indexRandom(true, docs);
numDocs *= 2;
logger.info(" --> waiting for relocation to complete", numDocs);
ensureGreen("test");// move all shards to the new node (it waits on relocation)
final int numIters = randomIntBetween(10, 20);
for (int i = 0; i < numIters; i++) {
SearchResponse afterRelocation = client().prepareSearch().setSize(ids.size()).get();
assertNoFailures(afterRelocation);
assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()]));
}
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {
private final CountDownLatch corruptionCount;