diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index 1980cfa7c64..537c4ef6d82 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -58,23 +58,29 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); + logger.info("--> allow 2 nodes for index [test] ..."); // now start another node, while we index allowNodes("test", 2); @@ -108,22 +114,28 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -156,24 +168,29 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); // now start more nodes, while we index + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -227,7 +244,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { final int numDocs = scaledRandomIntBetween(200, 50000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), numDocs)) { for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { indexer.assertNoFailures(); diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index dc3fa203488..3f739353be1 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.procedures.IntProcedure; -import com.carrotsearch.randomizedtesting.RandomizedTest; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -129,8 +128,8 @@ public class RelocationTests extends ElasticsearchIntegrationTest { } } - final int numDocs = scaledRandomIntBetween(200, 2500); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), scaledRandomIntBetween(2, 5), true, numDocs * 2)) { + int numDocs = scaledRandomIntBetween(200, 2500); + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), numDocs)) { logger.info("--> waiting for {} docs to be indexed ...", numDocs); waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); @@ -142,6 +141,9 @@ public class RelocationTests extends ElasticsearchIntegrationTest { int toNode = fromNode == 0 ? 1 : 0; fromNode += nodeShiftBased; toNode += nodeShiftBased; + numDocs = scaledRandomIntBetween(200, 1000); + logger.debug("--> Allow indexer to index [{}] documents", numDocs); + indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); client().admin().cluster().prepareReroute() .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) @@ -154,6 +156,7 @@ public class RelocationTests extends ElasticsearchIntegrationTest { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } logger.info("--> done relocations"); diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java index fa432ce7042..3326cb8d167 100644 --- a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java @@ -30,6 +30,7 @@ import org.junit.Assert; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,21 +49,64 @@ public class BackgroundIndexer implements AutoCloseable { final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); final CountDownLatch startLatch = new CountDownLatch(1); + final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore + final Semaphore availableBudget = new Semaphore(0); + /** + * Start indexing in the background using a random number of threads. + * + * @param index index name to index into + * @param type document type + * @param client client to use + */ public BackgroundIndexer(String index, String type, Client client) { - this(index, type, client, RandomizedTest.scaledRandomIntBetween(2, 5)); + this(index, type, client, -1); } - public BackgroundIndexer(String index, String type, Client client, int writerCount) { - this(index, type, client, writerCount, true, Integer.MAX_VALUE); + /** + * Start indexing in the background using a random number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public BackgroundIndexer(String index, String type, Client client, int numOfDocs) { + this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5)); } - public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart, final int maxNumDocs) { + /** + * Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + * @param writerCount number of indexing threads to use + */ + public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) { + this(index, type, client, numOfDocs, writerCount, true); + } + + /** + * Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + * @param writerCount number of indexing threads to use + * @param autoStart set to true to start indexing as soon as all threads have been created. + */ + public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, boolean autoStart) { failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; stopLatch = new CountDownLatch(writers.length); - logger.info("--> starting {} indexing threads", writerCount); + logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); for (int i = 0; i < writers.length; i++) { final int indexerId = i; final boolean batch = RandomizedTest.getRandom().nextBoolean(); @@ -73,9 +117,17 @@ public class BackgroundIndexer implements AutoCloseable { try { startLatch.await(); logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get() && indexCounter.get() < maxNumDocs) { // step out once we reach the hard limit + while (!stop.get()) { if (batch) { int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; + if (hasBudget.get()) { + batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one + if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) { + // time out -> check if we have to stop. + continue; + } + + } BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < batchSize; i++) { id = idGenerator.incrementAndGet(); @@ -92,12 +144,17 @@ public class BackgroundIndexer implements AutoCloseable { } } else { + + if (hasBudget.get() && !availableBudget.tryAcquire(250, TimeUnit.MILLISECONDS)) { + // time out -> check if we have to stop. + continue; + } id = idGenerator.incrementAndGet(); client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); indexCounter.incrementAndGet(); } } - logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {} maxNumDocs: {}", indexerId, stop.get(), indexCounter.get(), maxNumDocs); + logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), indexCounter.get()); } catch (Throwable e) { failures.add(e); logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); @@ -110,20 +167,63 @@ public class BackgroundIndexer implements AutoCloseable { } if (autoStart) { - startLatch.countDown(); + start(numOfDocs); } } + private void setBudget(int numOfDocs) { + logger.debug("updating budget to [{}]", numOfDocs); + if (numOfDocs >= 0) { + hasBudget.set(true); + availableBudget.release(numOfDocs); + } else { + hasBudget.set(false); + } + + } + + /** Start indexing with no limit to the number of documents */ public void start() { + start(-1); + } + + /** + * Start indexing + * + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public void start(int numOfDocs) { + assert !stop.get() : "background indexer can not be started after it has stopped"; + setBudget(numOfDocs); startLatch.countDown(); } + /** Pausing indexing by setting current document limit to 0 */ + public void pauseIndexing() { + availableBudget.drainPermits(); + setBudget(0); + } + + /** Continue indexing after it has paused. No new document limit will be set */ + public void continueIndexing() { + continueIndexing(-1); + } + + /** + * Continue indexing after it has paused. + * + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public void continueIndexing(int numOfDocs) { + setBudget(numOfDocs); + } + + /** Stop all background threads **/ public void stop() throws InterruptedException { if (stop.get()) { return; } stop.set(true); - Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); assertNoFailures(); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 80eddf841a3..e3acdaf6b0b 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -207,7 +207,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } try { transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1])); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]"); } } @@ -319,7 +319,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } public static TestCluster cluster() { - if (!(currentCluster instanceof TestCluster)) { + if (!(currentCluster instanceof TestCluster)) { throw new UnsupportedOperationException("current test cluster is immutable"); } return (TestCluster) currentCluster; @@ -558,7 +558,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase Predicate testDocs = new Predicate() { public boolean apply(Object o) { lastKnownCount[0] = indexer.totalIndexedDocs(); - if (lastKnownCount[0] > numDocs) { + if (lastKnownCount[0] >= numDocs) { long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); if (count == lastKnownCount[0]) { // no progress - try to refresh for the next time @@ -569,7 +569,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } else { logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); } - return lastKnownCount[0] > numDocs; + return lastKnownCount[0] >= numDocs; } }; @@ -1088,7 +1088,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase @Before public final void before() throws IOException { if (runTestScopeLifecycle()) { - beforeInternal(); + beforeInternal(); } } @@ -1139,7 +1139,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * * @see SuiteScopeTest */ - protected void setupSuiteScopeCluster() throws Exception {} + protected void setupSuiteScopeCluster() throws Exception { + } private static boolean isSuiteScope(Class clazz) { if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) {