From d88ac0a95a6b9f25927c967fd5511a45b7695a26 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 11 Mar 2014 21:07:29 -0400 Subject: [PATCH] Make indexRandom handle many documents better * Index one at a time only rarely if doing more then 300. * When launching async actions, take some care to make sure you don't already have more then 150 other async actions in flight. * When indexing in bulk split into chunks of 1000 documents. --- .../search/suggest/SuggestSearchTests.java | 2 +- .../test/ElasticsearchIntegrationTest.java | 94 ++++++++++++------- 2 files changed, 59 insertions(+), 37 deletions(-) diff --git a/src/test/java/org/elasticsearch/search/suggest/SuggestSearchTests.java b/src/test/java/org/elasticsearch/search/suggest/SuggestSearchTests.java index 398f0f8d745..eb5ddd8561a 100644 --- a/src/test/java/org/elasticsearch/search/suggest/SuggestSearchTests.java +++ b/src/test/java/org/elasticsearch/search/suggest/SuggestSearchTests.java @@ -930,7 +930,7 @@ public class SuggestSearchTests extends ElasticsearchIntegrationTest { .size(1)); assertSuggestion(searchSuggest, 0, 0, "simple_phrase", "nobel prize"); } - + protected Suggest searchSuggest(SuggestionBuilder... suggestion) { return searchSuggest(null, suggestion); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 7afd730f23e..34f852b0e13 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.ObjectArrayList; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.SeedUtils; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.lucene.util.AbstractRandomizedTest; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ExceptionsHelper; @@ -166,6 +167,19 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ public static final String INDEX_SEED_SETTING = "index.tests.seed"; + /** + * Threshold at which indexing switches from frequently async to frequently bulk. + */ + private static final int FREQUENT_BULK_THRESHOLD = 300; + /** + * Maximum number of async operations that indexRandom will kick off at one time. + */ + private static final int MAX_IN_FLIGHT_ASYNC_INDEXES = 150; + /** + * Maximum number of documents in a single bulk index request. + */ + private static final int MAX_BULK_INDEX_REQUEST_SIZE = 1000; + /** * The current cluster depending on the configured {@link Scope}. * By default if no {@link ClusterScope} is configured this will hold a reference to the global cluster carried @@ -790,49 +804,35 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase final String[] indices = indicesSet.toArray(new String[indicesSet.size()]); Collections.shuffle(builders, random); final CopyOnWriteArrayList> errors = new CopyOnWriteArrayList>(); - List latches = new ArrayList(); - if (frequently()) { - logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false); - final CountDownLatch latch = new CountDownLatch(builders.size()); - latches.add(latch); - for (IndexRequestBuilder indexRequestBuilder : builders) { - indexRequestBuilder.execute(new PayloadLatchedActionListener(indexRequestBuilder, latch, errors)); - if (rarely()) { - if (rarely()) { - client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener(newLatch(latches))); - } else if (rarely()) { - client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener(newLatch(latches))); - } else if (rarely()) { - client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches))); - } + List inFlightAsyncOperations = new ArrayList(); + // If you are indexing just a few documents then frequently do it one at a time. If many then frequently in bulk. + if (builders.size() < FREQUENT_BULK_THRESHOLD ? frequently() : rarely()) { + if (frequently()) { + logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false); + for (IndexRequestBuilder indexRequestBuilder : builders) { + indexRequestBuilder.execute(new PayloadLatchedActionListener(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors)); + postIndexAsyncActions(indices, inFlightAsyncOperations); } - } - - } else if (randomBoolean()) { - logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false); - for (IndexRequestBuilder indexRequestBuilder : builders) { - indexRequestBuilder.execute().actionGet(); - if (rarely()) { - if (rarely()) { - client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener(newLatch(latches))); - } else if (rarely()) { - client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute(new LatchedActionListener(newLatch(latches))); - } else if (rarely()) { - client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches))); - } + } else { + logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false); + for (IndexRequestBuilder indexRequestBuilder : builders) { + indexRequestBuilder.execute().actionGet(); + postIndexAsyncActions(indices, inFlightAsyncOperations); } } } else { logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, true); - BulkRequestBuilder bulkBuilder = client().prepareBulk(); - for (IndexRequestBuilder indexRequestBuilder : builders) { - bulkBuilder.add(indexRequestBuilder); + for (List segmented : Lists.partition(builders, between(MAX_BULK_INDEX_REQUEST_SIZE / 2, MAX_BULK_INDEX_REQUEST_SIZE))) { + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + for (IndexRequestBuilder indexRequestBuilder : segmented) { + bulkBuilder.add(indexRequestBuilder); + } + BulkResponse actionGet = bulkBuilder.execute().actionGet(); + assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false)); } - BulkResponse actionGet = bulkBuilder.execute().actionGet(); - assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false)); } - for (CountDownLatch countDownLatch : latches) { - countDownLatch.await(); + for (CountDownLatch operation: inFlightAsyncOperations) { + operation.await(); } final List actualErrors = new ArrayList(); for (Tuple tuple : errors) { @@ -854,6 +854,28 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return l; } + /** + * Maybe refresh, optimize, or flush then always make sure there aren't too many in flight async operations. + */ + private void postIndexAsyncActions(String[] indices, List inFlightAsyncOperations) throws InterruptedException { + if (rarely()) { + if (rarely()) { + client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenient()).execute( + new LatchedActionListener(newLatch(inFlightAsyncOperations))); + } else if (rarely()) { + client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenient()).execute( + new LatchedActionListener(newLatch(inFlightAsyncOperations))); + } else if (rarely()) { + client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenient()).setMaxNumSegments(between(1, 10)).setFlush(randomBoolean()).execute( + new LatchedActionListener(newLatch(inFlightAsyncOperations))); + } + } + while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) { + int waitFor = between(0, inFlightAsyncOperations.size() - 1); + inFlightAsyncOperations.remove(waitFor).await(); + } + } + private class LatchedActionListener implements ActionListener { private final CountDownLatch latch;