diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index c9bff451606..d084dd239f0 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -56,6 +56,95 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class); + public class BackgroundIndexer implements AutoCloseable { + + final Thread[] writers; + final CountDownLatch stopLatch; + final CopyOnWriteArrayList failures; + final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong indexCounter = new AtomicLong(); + final CountDownLatch startLatch = new CountDownLatch(1); + + public BackgroundIndexer() { + this(scaledRandomIntBetween(3, 10)); + } + + public BackgroundIndexer(int writerCount) { + this(writerCount, true); + } + + public BackgroundIndexer(int writerCount, boolean autoStart) { + + failures = new CopyOnWriteArrayList<>(); + writers = new Thread[writerCount]; + stopLatch = new CountDownLatch(writers.length); + logger.info("--> starting {} indexing threads", writerCount); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + final Client client = client(); + writers[i] = new Thread() { + @Override + public void run() { + long id = -1; + try { + startLatch.await(); + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + id = idGenerator.incrementAndGet(); + client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + indexCounter.incrementAndGet(); + } + logger.info("**** done indexing thread {}", indexerId); + } catch (Throwable e) { + failures.add(e); + logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + if (autoStart) { + startLatch.countDown(); + } + } + + public void start() { + startLatch.countDown(); + } + + public void stop() throws InterruptedException { + if (stop.get()) { + return; + } + stop.set(true); + + assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); + assertNoFailures(); + } + + public long totalIndexedDocs() { + return indexCounter.get(); + } + + public Throwable[] getFailures() { + return failures.toArray(new Throwable[failures.size()]); + } + + public void assertNoFailures() { + assertThat(failures, emptyIterable()); + } + + @Override + public void close() throws Exception { + stop(); + } + } + @Test @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE") @Slow @@ -64,54 +153,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - - logger.info("--> starting {} indexing threads", writers.length); - for (int i = 0; i < writers.length; i++) { - final int indexerId = i; - final Client client = client(); - writers[i] = new Thread() { - @Override - public void run() { - try { - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - long id = idGenerator.incrementAndGet(); - if (id % 1000 == 0) { - client.admin().indices().prepareFlush().execute().actionGet(); - } - client.prepareIndex("test", "type1", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - indexCounter.incrementAndGet(); - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Throwable e) { - logger.warn("**** failed indexing thread {}", e, indexerId); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } - try { - final int totalNumDocs = scaledRandomIntBetween(200, 20000); - int waitFor = totalNumDocs / 3; + final int totalNumDocs = scaledRandomIntBetween(200, 20000); + try (BackgroundIndexer indexer = new BackgroundIndexer()) { + int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 3; + waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 2 nodes for index [test] ..."); @@ -124,23 +181,18 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", totalNumDocs); logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); + indexer.stop(); logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexCounter.get(), 10); - } finally { - // verify the workers are shut down - stop.set(true); - stopLatch.await(); + iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); } - } @Test @@ -151,50 +203,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; - logger.info("--> starting {} indexing threads", writers.length); - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - for (int i = 0; i < writers.length; i++) { - final int indexerId = i; - final Client client = client(); - writers[i] = new Thread() { - @Override - public void run() { - try { - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - long id = idGenerator.incrementAndGet(); - client.prepareIndex("test", "type1", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - indexCounter.incrementAndGet(); - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Throwable e) { - logger.warn("**** failed indexing thread {}", e, indexerId); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } final int totalNumDocs = scaledRandomIntBetween(200, 20000); - int waitFor = totalNumDocs / 3; - try { + try (BackgroundIndexer indexer = new BackgroundIndexer()) { + int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 3; + waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -205,24 +229,17 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", totalNumDocs); - stop.set(true); - stopLatch.await(); - logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); + indexer.stop(); logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexCounter.get(), 10); - } finally { - // make sure the workers are stopped in case of an error - stop.set(true); - stopLatch.await(); + iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); } } @@ -234,53 +251,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { int numberOfShards = numberOfShards(); assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - logger.info("--> starting {} indexing threads", writers.length); - for (int i = 0; i < writers.length; i++) { - final int indexerId = i; - final Client client = client(); - writers[i] = new Thread() { - @Override - public void run() { - try { - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - long id = idGenerator.incrementAndGet(); - client.prepareIndex("test", "type1", Long.toString(id)) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - long count = indexCounter.incrementAndGet(); - if (count % 1000 == 0) { - logger.debug("{} documents indexed", count); - } - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Throwable e) { - logger.warn("**** failed indexing thread {}", e, indexerId); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } final int totalNumDocs = scaledRandomIntBetween(200, 20000); - int waitFor = totalNumDocs / 3; - try { + try (BackgroundIndexer indexer = new BackgroundIndexer()) { + int waitFor = totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 3; + waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor); + indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); // now start more nodes, while we index @@ -293,6 +279,8 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs); waitForDocs(totalNumDocs); + indexer.assertNoFailures(); + logger.info("--> {} docs indexed", totalNumDocs); // now, shutdown nodes logger.info("--> allow 3 nodes for index [test] ..."); @@ -311,8 +299,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForNodes(">=1").execute().actionGet().isTimedOut(), equalTo(false)); logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); + indexer.stop(); logger.info("--> indexing threads stopped"); assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForNodes(">=1").execute().actionGet().isTimedOut(), equalTo(false)); @@ -320,11 +307,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexCounter.get(), 10); - } finally { - // make sure the workers are stopped in case of an error - stop.set(true); - stopLatch.await(); + iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); } } @@ -338,48 +321,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { int allowNodes = 2; assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas))); - final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); - final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[scaledRandomIntBetween(3, 10)]; - final CountDownLatch stopLatch = new CountDownLatch(writers.length); - logger.info("--> starting {} indexing threads", writers.length); - final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); - final CountDownLatch startLatch = new CountDownLatch(1); - for (int i = 0; i < writers.length; i++) { - final int indexerId = i; - final Client client = client(); - writers[i] = new Thread() { - @Override - public void run() { - long id = -1; - try { - startLatch.await(); - logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get()) { - id = idGenerator.incrementAndGet(); - client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId) - .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); - indexCounter.incrementAndGet(); - } - logger.info("**** done indexing thread {}", indexerId); - } catch (Throwable e) { - failures.add(e); - logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); - } finally { - stopLatch.countDown(); - } - } - }; - writers[i].start(); - } - try { + final int numDocs = scaledRandomIntBetween(200, 50000); + + try (BackgroundIndexer indexer = new BackgroundIndexer()) { - final int numDocs = scaledRandomIntBetween(200, 50000); - logger.info("--> indexing {} docs in total ...", numDocs); - startLatch.countDown(); for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { - assertThat(failures, emptyIterable()); + indexer.assertNoFailures(); logger.info("--> waiting for {} docs to be indexed ...", i); waitForDocs(i); logger.info("--> {} docs indexed", i); @@ -390,9 +337,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { } logger.info("--> marking and waiting for indexing threads to stop ..."); - stop.set(true); - stopLatch.await(); - assertThat(failures, emptyIterable()); + indexer.stop(); logger.info("--> indexing threads stopped"); logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index"); @@ -403,11 +348,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numShards, indexCounter.get(), 10); - } finally { - // make sure the workers are stopped in case of an error - stop.set(true); - stopLatch.await(); + iterateAssertCount(numShards, indexer.totalIndexedDocs(), 10); } }