diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index d084dd239f0..1980cfa7c64 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -26,125 +26,29 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.client.Client; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.util.Arrays; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class); - - public class BackgroundIndexer implements AutoCloseable { - - final Thread[] writers; - final CountDownLatch stopLatch; - final CopyOnWriteArrayList failures; - final AtomicBoolean stop = new AtomicBoolean(false); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final CountDownLatch startLatch = new CountDownLatch(1); - - public BackgroundIndexer() { - this(scaledRandomIntBetween(3, 10)); - } - - public BackgroundIndexer(int writerCount) { - this(writerCount, true); - } - - public BackgroundIndexer(int writerCount, boolean autoStart) { - - failures = new CopyOnWriteArrayList<>(); - writers = new Thread[writerCount]; - stopLatch = new CountDownLatch(writers.length); - logger.info("--> starting {} indexing threads", writerCount); - for (int i = 0; i < writers.length; i++) { - final int indexerId = i; - final Client client = client(); - writers[i] = new Thread() { - @Override - public void run() { - long id = -1; - try { - startLatch.await(); - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - id = idGenerator.incrementAndGet(); - client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - indexCounter.incrementAndGet(); - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Throwable e) { - failures.add(e); - logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } - - if (autoStart) { - startLatch.countDown(); - } - } - - public void start() { - startLatch.countDown(); - } - - public void stop() throws InterruptedException { - if (stop.get()) { - return; - } - stop.set(true); - - assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); - assertNoFailures(); - } - - public long totalIndexedDocs() { - return indexCounter.get(); - } - - public Throwable[] getFailures() { - return failures.toArray(new Throwable[failures.size()]); - } - - public void assertNoFailures() { - assertThat(failures, emptyIterable()); - } - - @Override - public void close() throws Exception { - stop(); - } - } - @Test @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE") @Slow @@ -154,12 +58,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer()) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog @@ -167,9 +71,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 2 nodes for index [test] ..."); // now start another node, while we index @@ -180,7 +84,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=2").execute().actionGet().isTimedOut(), equalTo(false)); logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); - waitForDocs(totalNumDocs); + waitForDocs(totalNumDocs, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", totalNumDocs); @@ -204,12 +108,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer()) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog @@ -217,9 +121,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -228,7 +132,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); - waitForDocs(totalNumDocs); + waitForDocs(totalNumDocs, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", totalNumDocs); @@ -252,12 +156,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer()) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog @@ -265,9 +169,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); - waitForDocs(waitFor); + waitForDocs(waitFor, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", waitFor); + logger.info("--> {} docs indexed", waitFor); // now start more nodes, while we index logger.info("--> allow 4 nodes for index [test] ..."); @@ -278,10 +182,10 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); - waitForDocs(totalNumDocs); + waitForDocs(totalNumDocs, indexer); indexer.assertNoFailures(); - logger.info("--> {} docs indexed", totalNumDocs); + logger.info("--> {} docs indexed", totalNumDocs); // now, shutdown nodes logger.info("--> allow 3 nodes for index [test] ..."); allowNodes("test", 3); @@ -323,12 +227,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { final int numDocs = scaledRandomIntBetween(200, 50000); - try (BackgroundIndexer indexer = new BackgroundIndexer()) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { indexer.assertNoFailures(); logger.info("--> waiting for {} docs to be indexed ...", i); - waitForDocs(i); + waitForDocs(i, indexer); logger.info("--> {} docs indexed", i); allowNodes = 2 / allowNodes; allowNodes("test", allowNodes); @@ -417,25 +321,4 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { } }, 5, TimeUnit.MINUTES), equalTo(true)); } - - private void waitForDocs(final long numDocs) throws InterruptedException { - final long[] lastKnownCount = {-1}; - long lastStartCount = -1; - Predicate testDocs = new Predicate() { - public boolean apply(Object o) { - lastKnownCount[0] = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); - logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount[0], numDocs); - return lastKnownCount[0] > numDocs; - } - }; - // 5 minutes seems like a long time but while relocating, indexing threads can wait for up to ~1m before retrying when - // they first try to index into a shard which is not STARTED. - while (!awaitBusy(testDocs, 5, TimeUnit.MINUTES)) { - if (lastStartCount == lastKnownCount[0]) { - // we didn't make any progress - fail("failed to reach " + numDocs + "docs"); - } - lastStartCount = lastKnownCount[0]; - } - } } diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index beab60648f1..314a097e708 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -21,14 +21,9 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.procedures.IntProcedure; -import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; @@ -36,14 +31,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.junit.Test; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -106,19 +99,14 @@ public class RelocationTests extends ElasticsearchIntegrationTest { @Test @Slow - public void testPrimaryRelocationWhileIndexingRandom() throws Exception { - int numRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); - int numWriters = scaledRandomIntBetween(1, rarely() ? 10 : 4); - boolean batch = getRandom().nextBoolean(); - logger.info("testPrimaryRelocationWhileIndexingRandom(numRelocations={}, numWriters={}, batch={}", - numRelocations, numWriters, batch); - testPrimaryRelocationWhileIndexing(numRelocations, numWriters, batch); - } - - + public void testRelocationWhileIndexingRandom() throws Exception { + int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); + int numberOfReplicas = randomBoolean() ? 0 : 1; + int numberOfNodes = numberOfReplicas == 0 ? 2 : 3; - private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { - String[] nodes = new String[2]; + logger.info("testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})", numberOfRelocations, numberOfReplicas, numberOfNodes); + + String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); nodes[0] = cluster().startNode(); @@ -126,304 +114,93 @@ public class RelocationTests extends ElasticsearchIntegrationTest { client().admin().indices().prepareCreate("test") .setSettings(settingsBuilder() .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) + .put("index.number_of_replicas", numberOfReplicas) ).execute().actionGet(); - logger.info("--> starting [node2] ..."); - nodes[1] = cluster().startNode(); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[numberOfWriters]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - final CountDownLatch startLatch = new CountDownLatch(1); - logger.info("--> starting {} indexing threads", writers.length); - for (int i = 0; i < writers.length; i++) { - final Client perThreadClient = client(); - final int indexerId = i; - writers[i] = new Thread() { - @Override - public void run() { - try { - startLatch.await(); - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - if (batch) { - BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk(); - for (int i = 0; i < 100; i++) { - long id = idGenerator.incrementAndGet(); - if (id % 1000 == 0) { - perThreadClient.admin().indices().prepareFlush().execute().actionGet(); - } - bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id)) - .setSource("test", "value" + id)); - } - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - for (BulkItemResponse bulkItemResponse : bulkResponse) { - if (!bulkItemResponse.isFailed()) { - indexCounter.incrementAndGet(); - } else { - logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailure().getMessage()); - } - } - } else { - long id = idGenerator.incrementAndGet(); - if (id % 1000 == 0) { - perThreadClient.admin().indices().prepareFlush().execute().actionGet(); - } - perThreadClient.prepareIndex("test", "type1", Long.toString(id)) - .setSource("test", "value" + id).execute().actionGet(); - indexCounter.incrementAndGet(); + for (int i = 1; i < numberOfNodes; i++) { + logger.info("--> starting [node{}] ...", i + 1); + nodes[i] = cluster().startNode(); + if (i != numberOfNodes - 1) { + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(i + 1)).setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + } + } + + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client())) { + final int numDocs = scaledRandomIntBetween(200, 2500); + logger.info("--> waiting for {} docs to be indexed ...", numDocs); + waitForDocs(numDocs, indexer); + logger.info("--> {} docs indexed", numDocs); + + logger.info("--> starting relocations..."); + int nodeShiftBased = numberOfReplicas; // if we have replicas shift those + for (int i = 0; i < numberOfRelocations; i++) { + int fromNode = (i % 2); + int toNode = fromNode == 0 ? 1 : 0; + fromNode += nodeShiftBased; + toNode += nodeShiftBased; + logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) + .get(); + if (rarely()) { + logger.debug("--> flushing"); + client().admin().indices().prepareFlush().get(); + } + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); + } + logger.info("--> done relocations"); + logger.info("--> waiting for indexing threads to stop ..."); + indexer.stop(); + logger.info("--> indexing threads stopped"); + + logger.info("--> refreshing the index"); + client().admin().indices().prepareRefresh("test").execute().actionGet(); + logger.info("--> searching the index"); + boolean ranOnce = false; + for (int i = 0; i < 10; i++) { + try { + logger.info("--> START search test round {}", i + 1); + SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexer.totalIndexedDocs()).setNoFields().execute().actionGet().getHits(); + ranOnce = true; + if (hits.totalHits() != indexer.totalIndexedDocs()) { + int[] hitIds = new int[(int) indexer.totalIndexedDocs()]; + for (int hit = 0; hit < indexer.totalIndexedDocs(); hit++) { + hitIds[hit] = hit + 1; + } + IntOpenHashSet set = IntOpenHashSet.from(hitIds); + for (SearchHit hit : hits.hits()) { + int id = Integer.parseInt(hit.id()); + if (!set.remove(id)) { + logger.error("Extra id [{}]", id); } } - logger.info("**** done indexing thread {}", indexerId); - } catch (Exception e) { - logger.warn("**** failed indexing thread {}", e, indexerId); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } - startLatch.countDown(); - final int numDocs = scaledRandomIntBetween(200, 2500); - logger.info("--> waiting for {} docs to be indexed ...", numDocs); - awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - client().admin().indices().prepareRefresh().execute().actionGet(); - return client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() >= numDocs; - } - }); - logger.info("--> {} docs indexed", numDocs); + set.forEach(new IntProcedure() { - logger.info("--> starting relocations..."); - for (int i = 0; i < numberOfRelocations; i++) { - int fromNode = (i % 2); - int toNode = fromNode == 0 ? 1 : 0; - logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); - client().admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) - .execute().actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); - } - logger.info("--> done relocations"); - - logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); - logger.info("--> indexing threads stopped"); - - logger.info("--> refreshing the index"); - client().admin().indices().prepareRefresh("test").execute().actionGet(); - logger.info("--> searching the index"); - boolean ranOnce = false; - for (int i = 0; i < 10; i++) { - try { - logger.info("--> START search test round {}", i + 1); - SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); - ranOnce = true; - if (hits.totalHits() != indexCounter.get()) { - int[] hitIds = new int[(int) indexCounter.get()]; - for (int hit = 0; hit < indexCounter.get(); hit++) { - hitIds[hit] = hit + 1; - } - IntOpenHashSet set = IntOpenHashSet.from(hitIds); - for (SearchHit hit : hits.hits()) { - int id = Integer.parseInt(hit.id()); - if (!set.remove(id)) { - logger.error("Extra id [{}]", id); - } - } - set.forEach(new IntProcedure() { - - @Override - public void apply(int value) { - logger.error("Missing id [{}]", value); - } - - }); - } - assertThat(hits.totalHits(), equalTo(indexCounter.get())); - logger.info("--> DONE search test round {}", i + 1); - } catch (SearchPhaseExecutionException ex) { - // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? - logger.warn("Got exception while searching.", ex); - } - } - if (!ranOnce) { - fail(); - } - } - - @Test - @Slow - public void testReplicaRelocationWhileIndexingRandom() throws Exception { - int numRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); - int numWriters = scaledRandomIntBetween(1, rarely() ? 10 : 4); - boolean batch = getRandom().nextBoolean(); - logger.info("testReplicaRelocationWhileIndexing(numRelocations={}, numWriters={}, batch={}", numRelocations, numWriters, batch); - testReplicaRelocationWhileIndexing(numRelocations, numWriters, batch); - } - - private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { - logger.info("--> starting [node1] ..."); - String[] nodes = new String[3]; - nodes[0] = cluster().startNode(); - - logger.info("--> creating test index ..."); - client().admin().indices().prepareCreate("test") - .setSettings(settingsBuilder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - ).execute().actionGet(); - - logger.info("--> starting [node2] ..."); - nodes[1] = cluster().startNode(); - - ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - logger.info("--> starting [node3] ..."); - nodes[2] = cluster().startNode(); - - healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[numberOfWriters]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - final CountDownLatch startLatch = new CountDownLatch(1); - logger.info("--> starting {} indexing threads", writers.length); - for (int i = 0; i < writers.length; i++) { - final Client perThreadClient = client(); - final int indexerId = i; - writers[i] = new Thread() { - @Override - public void run() { - - try { - startLatch.await(); - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - if (batch) { - BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk(); - for (int i = 0; i < 100; i++) { - long id = idGenerator.incrementAndGet(); - if (id % 1000 == 0) { - perThreadClient.admin().indices().prepareFlush().execute().actionGet(); - } - bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id)) - .setSource("test", "value" + id)); - } - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - for (BulkItemResponse bulkItemResponse : bulkResponse) { - if (!bulkItemResponse.isFailed()) { - indexCounter.incrementAndGet(); - } else { - logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailure().getMessage()); - } - } - } else { - long id = idGenerator.incrementAndGet(); - if (id % 1000 == 0) { - perThreadClient.admin().indices().prepareFlush().execute().actionGet(); - } - perThreadClient.prepareIndex("test", "type1", Long.toString(id)) - .setSource("test", "value" + id).execute().actionGet(); - indexCounter.incrementAndGet(); + @Override + public void apply(int value) { + logger.error("Missing id [{}]", value); } - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Exception e) { - logger.warn("**** failed indexing thread {}", e, indexerId); - } finally { - stopLatch.countDown(); + + }); } + assertThat(hits.totalHits(), equalTo(indexer.totalIndexedDocs())); + logger.info("--> DONE search test round {}", i + 1); + } catch (SearchPhaseExecutionException ex) { + // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? + logger.warn("Got exception while searching.", ex); } - }; - writers[i].start(); - } - - startLatch.countDown(); - final int numDocs = scaledRandomIntBetween(200, 2500); - logger.info("--> waiting for {} docs to be indexed ...", numDocs); - awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - client().admin().indices().prepareRefresh().execute().actionGet(); - return client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() >= numDocs; } - }); - logger.info("--> {} docs indexed", numDocs); - - logger.info("--> starting relocations..."); - for (int i = 0; i < numberOfRelocations; i++) { - int fromNode = (1 + (i % 2)); - int toNode = fromNode == 1 ? 2 : 1; - logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); - client().admin().cluster().prepareReroute() - .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) - .execute().actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); - } - logger.info("--> done relocations"); - - logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); - logger.info("--> indexing threads stopped"); - - logger.info("--> refreshing the index"); - client().admin().indices().prepareRefresh("test").execute().actionGet(); - logger.info("--> searching the index"); - boolean ranOnce = false; - for (int i = 0; i < 10; i++) { - try { - logger.info("--> START search test round {}", i + 1); - SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits(); - ranOnce = true; - if (hits.totalHits() != indexCounter.get()) { - int[] hitIds = new int[(int) indexCounter.get()]; - for (int hit = 0; hit < indexCounter.get(); hit++) { - hitIds[hit] = hit + 1; - } - IntOpenHashSet set = IntOpenHashSet.from(hitIds); - for (SearchHit hit : hits.hits()) { - int id = Integer.parseInt(hit.id()); - if (!set.remove(id)) { - logger.error("Extra id [{}]", id); - } - } - set.forEach(new IntProcedure() { - - @Override - public void apply(int value) { - logger.error("Missing id [{}]", value); - } - }); - } - assertThat(hits.totalHits(), equalTo(indexCounter.get())); - logger.info("--> DONE search test round {}", i + 1); - } catch (SearchPhaseExecutionException ex) { - // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? - logger.warn("Got exception while searching.", ex); + if (!ranOnce) { + fail(); } } - if (!ranOnce) { - fail(); - } } + } diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java new file mode 100644 index 00000000000..c4ccca940db --- /dev/null +++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java @@ -0,0 +1,147 @@ +package org.elasticsearch.test;/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests; +import org.junit.Assert; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; + +public class BackgroundIndexer implements AutoCloseable { + + private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class); + + final Thread[] writers; + final CountDownLatch stopLatch; + final CopyOnWriteArrayList failures; + final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong indexCounter = new AtomicLong(); + final CountDownLatch startLatch = new CountDownLatch(1); + + public BackgroundIndexer(String index, String type, Client client) { + this(index, type, client, RandomizedTest.scaledRandomIntBetween(3, 10)); + } + + public BackgroundIndexer(String index, String type, Client client, int writerCount) { + this(index, type, client, writerCount, true); + } + + public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart) { + + failures = new CopyOnWriteArrayList<>(); + writers = new Thread[writerCount]; + stopLatch = new CountDownLatch(writers.length); + logger.info("--> starting {} indexing threads", writerCount); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + final boolean batch = RandomizedTest.getRandom().nextBoolean(); + writers[i] = new Thread() { + @Override + public void run() { + long id = -1; + try { + startLatch.await(); + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + if (batch) { + int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; + BulkRequestBuilder bulkRequest = client.prepareBulk(); + for (int i = 0; i < batchSize; i++) { + id = idGenerator.incrementAndGet(); + bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource("test", "value" + id)); + } + BulkResponse bulkResponse = bulkRequest.get(); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (!bulkItemResponse.isFailed()) { + indexCounter.incrementAndGet(); + } else { + throw new ElasticsearchException("bulk request failure, id: [" + + bulkItemResponse.getFailure().getId() + "] message: " + bulkItemResponse.getFailure().getMessage()); + } + } + + } else { + id = idGenerator.incrementAndGet(); + client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); + indexCounter.incrementAndGet(); + } + } + logger.info("**** done indexing thread {}", indexerId); + } catch (Throwable e) { + failures.add(e); + logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + if (autoStart) { + startLatch.countDown(); + } + } + + public void start() { + startLatch.countDown(); + } + + public void stop() throws InterruptedException { + if (stop.get()) { + return; + } + stop.set(true); + + Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); + assertNoFailures(); + } + + public long totalIndexedDocs() { + return indexCounter.get(); + } + + public Throwable[] getFailures() { + return failures.toArray(new Throwable[failures.size()]); + } + + public void assertNoFailures() { + Assert.assertThat(failures, emptyIterable()); + } + + @Override + public void close() throws Exception { + stop(); + } +}