[TEST] Make flush in #indexRandom optinal

Some tests like CorruptedTranslogTests rely on the fact that we
are recovering from translog. In those cases we need to prevent
flushes from happening during indexing. This change adds an optional
flag on the #indexRandom utility to disable flushes.
This commit is contained in:
Simon Willnauer 2014-09-16 10:41:28 +02:00
parent 38f5aa2248
commit a7dde8dd80
2 changed files with 25 additions and 7 deletions

View File

@ -83,7 +83,7 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest {
builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar");
} }
disableTranslogFlush("test"); disableTranslogFlush("test");
indexRandom(false, false, builders); indexRandom(false, false, false, Arrays.asList(builders)); // this one
// Corrupt the translog file(s) // Corrupt the translog file(s)
corruptRandomTranslogFiles(); corruptRandomTranslogFiles();

View File

@ -1243,7 +1243,25 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* @param builders the documents to index. * @param builders the documents to index.
*/ */
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException { public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
Random random = getRandom(); indexRandom(forceRefresh, dummyDocuments, true, builders);
}
/**
* Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either
* indexes they in a blocking or async fashion. This is very useful to catch problems that relate to internal document
* ids or index segment creations. Some features might have bug when a given document is the first or the last in a
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
* layout.
*
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
* @param dummyDocuments if <tt>true</tt> some empty dummy documents may be randomly inserted into the document list and deleted once
* all documents are indexed. This is useful to produce deleted documents on the server side.
* @param maybeFlush if <tt>true</tt> this method may randomly execute full flushes after index operations.
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders) throws InterruptedException, ExecutionException {
Random random = getRandom();
Set<String> indicesSet = new HashSet<>(); Set<String> indicesSet = new HashSet<>();
for (IndexRequestBuilder builder : builders) { for (IndexRequestBuilder builder : builders) {
indicesSet.add(builder.request().index()); indicesSet.add(builder.request().index());
@ -1272,13 +1290,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false); logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), true, false);
for (IndexRequestBuilder indexRequestBuilder : builders) { for (IndexRequestBuilder indexRequestBuilder : builders) {
indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors)); indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, newLatch(inFlightAsyncOperations), errors));
postIndexAsyncActions(indices, inFlightAsyncOperations); postIndexAsyncActions(indices, inFlightAsyncOperations, maybeFlush);
} }
} else { } else {
logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false); logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false);
for (IndexRequestBuilder indexRequestBuilder : builders) { for (IndexRequestBuilder indexRequestBuilder : builders) {
indexRequestBuilder.execute().actionGet(); indexRequestBuilder.execute().actionGet();
postIndexAsyncActions(indices, inFlightAsyncOperations); postIndexAsyncActions(indices, inFlightAsyncOperations, maybeFlush);
} }
} }
} else { } else {
@ -1339,16 +1357,16 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
/** /**
* Maybe refresh, optimize, or flush then always make sure there aren't too many in flight async operations. * Maybe refresh, optimize, or flush then always make sure there aren't too many in flight async operations.
*/ */
private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations) throws InterruptedException { private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations, boolean maybeFlush) throws InterruptedException {
if (rarely()) { if (rarely()) {
if (rarely()) { if (rarely()) {
client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( client().admin().indices().prepareRefresh(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<RefreshResponse>(newLatch(inFlightAsyncOperations))); new LatchedActionListener<RefreshResponse>(newLatch(inFlightAsyncOperations)));
} else if (rarely()) { } else if (maybeFlush && rarely()) {
client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute( client().admin().indices().prepareFlush(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute(
new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations))); new LatchedActionListener<FlushResponse>(newLatch(inFlightAsyncOperations)));
} else if (rarely()) { } else if (rarely()) {
client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(randomBoolean()).execute( client().admin().indices().prepareOptimize(indices).setIndicesOptions(IndicesOptions.lenientExpandOpen()).setMaxNumSegments(between(1, 10)).setFlush(maybeFlush && randomBoolean()).execute(
new LatchedActionListener<OptimizeResponse>(newLatch(inFlightAsyncOperations))); new LatchedActionListener<OptimizeResponse>(newLatch(inFlightAsyncOperations)));
} }
} }