diff --git a/src/test/java/org/elasticsearch/AbstractSharedClusterTest.java b/src/test/java/org/elasticsearch/AbstractSharedClusterTest.java index 9fed359a8b7..4025823c920 100644 --- a/src/test/java/org/elasticsearch/AbstractSharedClusterTest.java +++ b/src/test/java/org/elasticsearch/AbstractSharedClusterTest.java @@ -21,6 +21,7 @@ package org.elasticsearch; import com.google.common.base.Joiner; import com.google.common.collect.Iterators; import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ShardOperationFailedException; @@ -55,10 +56,13 @@ import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.rest.RestStatus; +import org.hamcrest.Matchers; import org.junit.*; import java.io.IOException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -396,22 +400,79 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase { Random random = getRandom(); List list = Arrays.asList(builders); Collections.shuffle(list, random); - for (IndexRequestBuilder indexRequestBuilder : list) { - indexRequestBuilder.execute().actionGet(); - if (rarely()) { + final CopyOnWriteArrayList errors = new CopyOnWriteArrayList(); + List latches = new ArrayList(); + if (frequently()) { + logger.info("Index [{}] docs async: [{}]", list.size(), true); + final CountDownLatch latch = new CountDownLatch(list.size()); + latches.add(latch); + for (IndexRequestBuilder indexRequestBuilder : list) { + indexRequestBuilder.execute(new LatchedActionListener(latch, errors)); if (rarely()) { - client().admin().indices().prepareRefresh(index).execute().get(); - } else if (rarely()) { - client().admin().indices().prepareFlush(index).execute().get(); - } else if (rarely()) { - client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute().get(); + if (rarely()) { + client().admin().indices().prepareRefresh(index).execute(new LatchedActionListener(newLatch(latches), errors)); + } else if (rarely()) { + client().admin().indices().prepareFlush(index).execute(new LatchedActionListener(newLatch(latches), errors)); + } else if (rarely()) { + client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches), errors)); + } + } + } + + } else { + logger.info("Index [{}] docs async: [{}]", list.size(), false); + for (IndexRequestBuilder indexRequestBuilder : list) { + indexRequestBuilder.execute().actionGet(); + if (rarely()) { + if (rarely()) { + client().admin().indices().prepareRefresh(index).execute(new LatchedActionListener(newLatch(latches), errors)); + } else if (rarely()) { + client().admin().indices().prepareFlush(index).execute(new LatchedActionListener(newLatch(latches), errors)); + } else if (rarely()) { + client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches), errors)); + } } } } + for (CountDownLatch countDownLatch : latches) { + countDownLatch.await(); + } + assertThat(errors, Matchers.emptyIterable()); if (forceRefresh) { assertNoFailures(client().admin().indices().prepareRefresh(index).execute().get()); } } + + private static final CountDownLatch newLatch(List latches) { + CountDownLatch l = new CountDownLatch(1); + latches.add(l); + return l; + } + + private static class LatchedActionListener implements ActionListener { + private final CountDownLatch latch; + private final CopyOnWriteArrayList errors; + + public LatchedActionListener(CountDownLatch latch, CopyOnWriteArrayList errors) { + this.latch = latch; + this.errors = errors; + } + + @Override + public void onResponse(Response response) { + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + try { + errors.add(e); + } finally { + latch.countDown(); + } + } + + } public void clearScroll(String... scrollIds) { ClearScrollResponse clearResponse = client().prepareClearScroll()