diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index 825c8526c99..c9bff451606 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -99,18 +99,20 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { writers[i].start(); } try { - logger.info("--> waiting for 2000 docs to be indexed ..."); - waitForDocs(2000); - logger.info("--> 2000 docs indexed"); + final int totalNumDocs = scaledRandomIntBetween(200, 20000); + int waitFor = totalNumDocs / 3; + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - - logger.info("--> waiting for 4000 docs to be indexed ..."); - waitForDocs(4000); - logger.info("--> 4000 docs indexed"); + waitFor += totalNumDocs / 3; + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 2 nodes for index [test] ..."); // now start another node, while we index @@ -120,9 +122,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { // make sure the cluster state is green, and all has been recovered assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=2").execute().actionGet().isTimedOut(), equalTo(false)); - logger.info("--> waiting for 15000 docs to be indexed ..."); - waitForDocs(15000); - logger.info("--> 15000 docs indexed"); + logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); + waitForDocs(totalNumDocs); + logger.info("--> {} docs indexed", totalNumDocs); logger.info("--> marking and waiting for indexing threads to stop ..."); stop.set(true); @@ -179,20 +181,21 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { }; writers[i].start(); } - + final int totalNumDocs = scaledRandomIntBetween(200, 20000); + int waitFor = totalNumDocs / 3; try { - logger.info("--> waiting for 2000 docs to be indexed ..."); - waitForDocs(2000); - logger.info("--> 2000 docs indexed"); + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - - logger.info("--> waiting for 4000 docs to be indexed ..."); - waitForDocs(4000); - logger.info("--> 4000 docs indexed"); + waitFor += totalNumDocs / 3; + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -200,9 +203,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=4").execute().actionGet().isTimedOut(), equalTo(false)); - logger.info("--> waiting for 15000 docs to be indexed ..."); - waitForDocs(15000); - logger.info("--> 15000 docs indexed"); + logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); + waitForDocs(totalNumDocs); + logger.info("--> {} docs indexed", totalNumDocs); stop.set(true); stopLatch.await(); @@ -264,19 +267,21 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { }; writers[i].start(); } + final int totalNumDocs = scaledRandomIntBetween(200, 20000); + int waitFor = totalNumDocs / 3; try { - logger.info("--> waiting for 2000 docs to be indexed ..."); - waitForDocs(2000); - logger.info("--> 2000 docs indexed"); + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - - logger.info("--> waiting for 4000 docs to be indexed ..."); - waitForDocs(4000); - logger.info("--> 4000 docs indexed"); + waitFor += totalNumDocs / 3; + logger.info("--> waiting for {} docs to be indexed ...", waitFor); + waitForDocs(waitFor); + logger.info("--> {} docs indexed", waitFor); // now start more nodes, while we index logger.info("--> allow 4 nodes for index [test] ..."); @@ -286,10 +291,9 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=4").execute().actionGet().isTimedOut(), equalTo(false)); - logger.info("--> waiting for 15000 docs to be indexed ..."); - waitForDocs(15000); - logger.info("--> 15000 docs indexed"); - + logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); + waitForDocs(totalNumDocs); + logger.info("--> {} docs indexed", totalNumDocs); // now, shutdown nodes logger.info("--> allow 3 nodes for index [test] ..."); allowNodes("test", 3); @@ -341,6 +345,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { final CountDownLatch stopLatch = new CountDownLatch(writers.length); logger.info("--> starting {} indexing threads", writers.length); final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); + final CountDownLatch startLatch = new CountDownLatch(1); for (int i = 0; i < writers.length; i++) { final int indexerId = i; final Client client = client(); @@ -349,6 +354,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { public void run() { long id = -1; try { + startLatch.await(); logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { id = idGenerator.incrementAndGet(); @@ -369,8 +375,10 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { } try { - final int numDocs = between(10000, 50000); - for (int i = 0; i < numDocs; i += between(100, 1000)) { + final int numDocs = scaledRandomIntBetween(200, 50000); + logger.info("--> indexing {} docs in total ...", numDocs); + startLatch.countDown(); + for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { assertThat(failures, emptyIterable()); logger.info("--> waiting for {} docs to be indexed ...", i); waitForDocs(i);