[Test] Added better control over the number of documents indexed by BackgroundIndexer

Used the new controls to reduce indexing activity in RelocationTests and RecoveryWhileUnderLoadTests

Closes #5696
This commit is contained in:
Boaz Leskes 2014-04-06 00:41:24 +02:00
parent d26a956231
commit 7b9df39800
4 changed files with 149 additions and 28 deletions

View File

@ -58,23 +58,29 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
final int totalNumDocs = scaledRandomIntBetween(200, 20000); final int totalNumDocs = scaledRandomIntBetween(200, 20000);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10;
int waitFor = totalNumDocs / 10; int extraDocs = waitFor;
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
extraDocs = totalNumDocs / 10;
waitFor += extraDocs;
indexer.continueIndexing(extraDocs);
logger.info("--> flushing the index ...."); logger.info("--> flushing the index ....");
// now flush, just to make sure we have some data in the index, not just translog // now flush, just to make sure we have some data in the index, not just translog
client().admin().indices().prepareFlush().execute().actionGet(); client().admin().indices().prepareFlush().execute().actionGet();
waitFor += totalNumDocs / 10;
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
extraDocs = totalNumDocs - waitFor;
indexer.continueIndexing(extraDocs);
logger.info("--> allow 2 nodes for index [test] ..."); logger.info("--> allow 2 nodes for index [test] ...");
// now start another node, while we index // now start another node, while we index
allowNodes("test", 2); allowNodes("test", 2);
@ -108,22 +114,28 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
final int totalNumDocs = scaledRandomIntBetween(200, 20000); final int totalNumDocs = scaledRandomIntBetween(200, 20000);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10;
int waitFor = totalNumDocs / 10; int extraDocs = waitFor;
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
extraDocs = totalNumDocs / 10;
waitFor += extraDocs;
indexer.continueIndexing(extraDocs);
logger.info("--> flushing the index ...."); logger.info("--> flushing the index ....");
// now flush, just to make sure we have some data in the index, not just translog // now flush, just to make sure we have some data in the index, not just translog
client().admin().indices().prepareFlush().execute().actionGet(); client().admin().indices().prepareFlush().execute().actionGet();
waitFor += totalNumDocs / 10;
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
extraDocs = totalNumDocs - waitFor;
indexer.continueIndexing(extraDocs);
logger.info("--> allow 4 nodes for index [test] ..."); logger.info("--> allow 4 nodes for index [test] ...");
allowNodes("test", 4); allowNodes("test", 4);
@ -156,24 +168,29 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
final int totalNumDocs = scaledRandomIntBetween(200, 20000); final int totalNumDocs = scaledRandomIntBetween(200, 20000);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { int waitFor = totalNumDocs / 10;
int waitFor = totalNumDocs / 10; int extraDocs = waitFor;
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
extraDocs = totalNumDocs / 10;
waitFor += extraDocs;
indexer.continueIndexing(extraDocs);
logger.info("--> flushing the index ...."); logger.info("--> flushing the index ....");
// now flush, just to make sure we have some data in the index, not just translog // now flush, just to make sure we have some data in the index, not just translog
client().admin().indices().prepareFlush().execute().actionGet(); client().admin().indices().prepareFlush().execute().actionGet();
waitFor += totalNumDocs / 10;
logger.info("--> waiting for {} docs to be indexed ...", waitFor); logger.info("--> waiting for {} docs to be indexed ...", waitFor);
waitForDocs(waitFor, indexer); waitForDocs(waitFor, indexer);
indexer.assertNoFailures(); indexer.assertNoFailures();
logger.info("--> {} docs indexed", waitFor); logger.info("--> {} docs indexed", waitFor);
// now start more nodes, while we index // now start more nodes, while we index
extraDocs = totalNumDocs - waitFor;
indexer.continueIndexing(extraDocs);
logger.info("--> allow 4 nodes for index [test] ..."); logger.info("--> allow 4 nodes for index [test] ...");
allowNodes("test", 4); allowNodes("test", 4);
@ -227,7 +244,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
final int numDocs = scaledRandomIntBetween(200, 50000); final int numDocs = scaledRandomIntBetween(200, 50000);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), numDocs)) {
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
indexer.assertNoFailures(); indexer.assertNoFailures();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.recovery;
import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure; import com.carrotsearch.hppc.procedures.IntProcedure;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
@ -129,8 +128,8 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
} }
} }
final int numDocs = scaledRandomIntBetween(200, 2500); int numDocs = scaledRandomIntBetween(200, 2500);
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), scaledRandomIntBetween(2, 5), true, numDocs * 2)) { try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), numDocs)) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs); logger.info("--> waiting for {} docs to be indexed ...", numDocs);
waitForDocs(numDocs, indexer); waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs); logger.info("--> {} docs indexed", numDocs);
@ -142,6 +141,9 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
int toNode = fromNode == 0 ? 1 : 0; int toNode = fromNode == 0 ? 1 : 0;
fromNode += nodeShiftBased; fromNode += nodeShiftBased;
toNode += nodeShiftBased; toNode += nodeShiftBased;
numDocs = scaledRandomIntBetween(200, 1000);
logger.debug("--> Allow indexer to index [{}] documents", numDocs);
indexer.continueIndexing(numDocs);
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
client().admin().cluster().prepareReroute() client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
@ -154,6 +156,7 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
indexer.pauseIndexing();
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
} }
logger.info("--> done relocations"); logger.info("--> done relocations");

View File

@ -30,6 +30,7 @@ import org.junit.Assert;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -48,21 +49,64 @@ public class BackgroundIndexer implements AutoCloseable {
final AtomicLong idGenerator = new AtomicLong(); final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong();
final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore
final Semaphore availableBudget = new Semaphore(0);
/**
* Start indexing in the background using a random number of threads.
*
* @param index index name to index into
* @param type document type
* @param client client to use
*/
public BackgroundIndexer(String index, String type, Client client) { public BackgroundIndexer(String index, String type, Client client) {
this(index, type, client, RandomizedTest.scaledRandomIntBetween(2, 5)); this(index, type, client, -1);
} }
public BackgroundIndexer(String index, String type, Client client, int writerCount) { /**
this(index, type, client, writerCount, true, Integer.MAX_VALUE); * Start indexing in the background using a random number of threads. Indexing will be paused after numOfDocs docs has
* been indexed.
*
* @param index index name to index into
* @param type document type
* @param client client to use
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
*/
public BackgroundIndexer(String index, String type, Client client, int numOfDocs) {
this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5));
} }
public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart, final int maxNumDocs) { /**
* Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has
* been indexed.
*
* @param index index name to index into
* @param type document type
* @param client client to use
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
* @param writerCount number of indexing threads to use
*/
public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) {
this(index, type, client, numOfDocs, writerCount, true);
}
/**
* Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has
* been indexed.
*
* @param index index name to index into
* @param type document type
* @param client client to use
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
* @param writerCount number of indexing threads to use
* @param autoStart set to true to start indexing as soon as all threads have been created.
*/
public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, boolean autoStart) {
failures = new CopyOnWriteArrayList<>(); failures = new CopyOnWriteArrayList<>();
writers = new Thread[writerCount]; writers = new Thread[writerCount];
stopLatch = new CountDownLatch(writers.length); stopLatch = new CountDownLatch(writers.length);
logger.info("--> starting {} indexing threads", writerCount); logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
for (int i = 0; i < writers.length; i++) { for (int i = 0; i < writers.length; i++) {
final int indexerId = i; final int indexerId = i;
final boolean batch = RandomizedTest.getRandom().nextBoolean(); final boolean batch = RandomizedTest.getRandom().nextBoolean();
@ -73,9 +117,17 @@ public class BackgroundIndexer implements AutoCloseable {
try { try {
startLatch.await(); startLatch.await();
logger.info("**** starting indexing thread {}", indexerId); logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get() && indexCounter.get() < maxNumDocs) { // step out once we reach the hard limit while (!stop.get()) {
if (batch) { if (batch) {
int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; int batchSize = RandomizedTest.getRandom().nextInt(20) + 1;
if (hasBudget.get()) {
batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one
if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) {
// time out -> check if we have to stop.
continue;
}
}
BulkRequestBuilder bulkRequest = client.prepareBulk(); BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
id = idGenerator.incrementAndGet(); id = idGenerator.incrementAndGet();
@ -92,12 +144,17 @@ public class BackgroundIndexer implements AutoCloseable {
} }
} else { } else {
if (hasBudget.get() && !availableBudget.tryAcquire(250, TimeUnit.MILLISECONDS)) {
// time out -> check if we have to stop.
continue;
}
id = idGenerator.incrementAndGet(); id = idGenerator.incrementAndGet();
client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get();
indexCounter.incrementAndGet(); indexCounter.incrementAndGet();
} }
} }
logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {} maxNumDocs: {}", indexerId, stop.get(), indexCounter.get(), maxNumDocs); logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), indexCounter.get());
} catch (Throwable e) { } catch (Throwable e) {
failures.add(e); failures.add(e);
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
@ -110,20 +167,63 @@ public class BackgroundIndexer implements AutoCloseable {
} }
if (autoStart) { if (autoStart) {
startLatch.countDown(); start(numOfDocs);
} }
} }
private void setBudget(int numOfDocs) {
logger.debug("updating budget to [{}]", numOfDocs);
if (numOfDocs >= 0) {
hasBudget.set(true);
availableBudget.release(numOfDocs);
} else {
hasBudget.set(false);
}
}
/** Start indexing with no limit to the number of documents */
public void start() { public void start() {
start(-1);
}
/**
* Start indexing
*
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
*/
public void start(int numOfDocs) {
assert !stop.get() : "background indexer can not be started after it has stopped";
setBudget(numOfDocs);
startLatch.countDown(); startLatch.countDown();
} }
/** Pausing indexing by setting current document limit to 0 */
public void pauseIndexing() {
availableBudget.drainPermits();
setBudget(0);
}
/** Continue indexing after it has paused. No new document limit will be set */
public void continueIndexing() {
continueIndexing(-1);
}
/**
* Continue indexing after it has paused.
*
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
*/
public void continueIndexing(int numOfDocs) {
setBudget(numOfDocs);
}
/** Stop all background threads **/
public void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (stop.get()) { if (stop.get()) {
return; return;
} }
stop.set(true); stop.set(true);
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
assertNoFailures(); assertNoFailures();
} }

View File

@ -207,7 +207,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
} }
try { try {
transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1])); transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1]));
} catch(NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]"); throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
} }
} }
@ -319,7 +319,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
} }
public static TestCluster cluster() { public static TestCluster cluster() {
if (!(currentCluster instanceof TestCluster)) { if (!(currentCluster instanceof TestCluster)) {
throw new UnsupportedOperationException("current test cluster is immutable"); throw new UnsupportedOperationException("current test cluster is immutable");
} }
return (TestCluster) currentCluster; return (TestCluster) currentCluster;
@ -558,7 +558,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
Predicate<Object> testDocs = new Predicate<Object>() { Predicate<Object> testDocs = new Predicate<Object>() {
public boolean apply(Object o) { public boolean apply(Object o) {
lastKnownCount[0] = indexer.totalIndexedDocs(); lastKnownCount[0] = indexer.totalIndexedDocs();
if (lastKnownCount[0] > numDocs) { if (lastKnownCount[0] >= numDocs) {
long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount();
if (count == lastKnownCount[0]) { if (count == lastKnownCount[0]) {
// no progress - try to refresh for the next time // no progress - try to refresh for the next time
@ -569,7 +569,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
} else { } else {
logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs);
} }
return lastKnownCount[0] > numDocs; return lastKnownCount[0] >= numDocs;
} }
}; };
@ -1088,7 +1088,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
@Before @Before
public final void before() throws IOException { public final void before() throws IOException {
if (runTestScopeLifecycle()) { if (runTestScopeLifecycle()) {
beforeInternal(); beforeInternal();
} }
} }
@ -1139,7 +1139,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
* *
* @see SuiteScopeTest * @see SuiteScopeTest
*/ */
protected void setupSuiteScopeCluster() throws Exception {} protected void setupSuiteScopeCluster() throws Exception {
}
private static boolean isSuiteScope(Class<?> clazz) { private static boolean isSuiteScope(Class<?> clazz) {
if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) { if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) {