diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index c40cbb32f61..49209390897 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.test.AbstractIntegrationTest; import org.junit.Test; import java.util.Arrays; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,6 +48,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; /** @@ -303,36 +305,38 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE,action.index:TRACE,action.support.replication:TRACE,cluster.service:DEBUG") @Slow public void recoverWhileRelocating() throws Exception { - final int numShards = between(5, 10); + final int numShards = between(2, 10); final int numReplicas = 0; cluster().ensureAtLeastNumNodes(3); logger.info("--> creating test index ..."); int allowNodes = 2; assertAcked(prepareCreate("test").setSettings(randomSettingsBuilder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build())); - final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); final AtomicBoolean stop = new AtomicBoolean(false); - Thread[] writers = new Thread[5]; + Thread[] writers = new Thread[atLeast(3)]; final CountDownLatch stopLatch = new CountDownLatch(writers.length); logger.info("--> starting {} indexing threads", writers.length); + final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); for (int i = 0; i < writers.length; i++) { final int indexerId = i; final Client client = client(); writers[i] = new Thread() { @Override public void run() { + long id = -1; try { logger.info("**** starting indexing thread {}", indexerId); while (!stop.get()) { - long id = idGenerator.incrementAndGet(); + id = idGenerator.incrementAndGet(); client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId) .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); indexCounter.incrementAndGet(); } logger.info("**** done indexing thread {}", indexerId); } catch (Throwable e) { - logger.warn("**** failed indexing thread {}", e, indexerId); + failures.add(e); + logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); } finally { stopLatch.countDown(); } @@ -341,7 +345,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { writers[i].start(); } - for (int i = 0; i < 100000; i += 1000) { + final int numDocs = between(10000, 50000); + for (int i = 0; i < numDocs; i += between(100, 1000)) { + assertThat(failures, emptyIterable()); logger.info("--> waiting for {} docs to be indexed ...", i); waitForDocs(i); logger.info("--> {} docs indexed", i); @@ -353,6 +359,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { logger.info("--> marking and waiting for indexing threads to stop ..."); stop.set(true); + assertThat(failures, emptyIterable()); stopLatch.await(); logger.info("--> indexing threads stopped");