[TEST] Fix NPE in ElasticsearchIntegrationTest if no indexer is provided

Closes #9907
This commit is contained in:
Simon Willnauer 2015-02-26 21:13:39 +01:00
parent b05f5ebee8
commit 261e21a386

View File

@ -129,6 +129,7 @@ import java.nio.file.Paths;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -991,35 +992,37 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/ */
public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final @Nullable BackgroundIndexer indexer) public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final @Nullable BackgroundIndexer indexer)
throws InterruptedException { throws InterruptedException {
final long[] lastKnownCount = {-1}; final AtomicLong lastKnownCount = new AtomicLong(-1);
long lastStartCount = -1; long lastStartCount = -1;
Predicate<Object> testDocs = new Predicate<Object>() { Predicate<Object> testDocs = new Predicate<Object>() {
@Override @Override
public boolean apply(Object o) { public boolean apply(Object o) {
lastKnownCount[0] = indexer.totalIndexedDocs(); if (indexer != null) {
if (lastKnownCount[0] >= numDocs) { lastKnownCount.set(indexer.totalIndexedDocs());
}
if (lastKnownCount.get() >= numDocs) {
long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount();
if (count == lastKnownCount[0]) { if (count == lastKnownCount.get()) {
// no progress - try to refresh for the next time // no progress - try to refresh for the next time
client().admin().indices().prepareRefresh().get(); client().admin().indices().prepareRefresh().get();
} }
lastKnownCount[0] = count; lastKnownCount.set(count);
logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount[0], numDocs); logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs);
} else { } else {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs);
} }
return lastKnownCount[0] >= numDocs; return lastKnownCount.get() >= numDocs;
} }
}; };
while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) {
if (lastStartCount == lastKnownCount[0]) { if (lastStartCount == lastKnownCount.get()) {
// we didn't make any progress // we didn't make any progress
fail("failed to reach " + numDocs + "docs"); fail("failed to reach " + numDocs + "docs");
} }
lastStartCount = lastKnownCount[0]; lastStartCount = lastKnownCount.get();
} }
return lastKnownCount[0]; return lastKnownCount.get();
} }