Evilize RecoveryWhileUnderLoadTests a bit more

This commit is contained in:
Simon Willnauer 2013-10-31 14:25:10 +01:00
parent 5f2b7dc266
commit 59588913b3
1 changed files with 13 additions and 6 deletions

View File

@ -38,6 +38,7 @@ import org.elasticsearch.test.AbstractIntegrationTest;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -47,6 +48,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
/**
@ -303,36 +305,38 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE,action.index:TRACE,action.support.replication:TRACE,cluster.service:DEBUG")
@Slow
public void recoverWhileRelocating() throws Exception {
final int numShards = between(5, 10);
final int numShards = between(2, 10);
final int numReplicas = 0;
cluster().ensureAtLeastNumNodes(3);
logger.info("--> creating test index ...");
int allowNodes = 2;
assertAcked(prepareCreate("test").setSettings(randomSettingsBuilder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build()));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
final AtomicBoolean stop = new AtomicBoolean(false);
Thread[] writers = new Thread[5];
Thread[] writers = new Thread[atLeast(3)];
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
logger.info("--> starting {} indexing threads", writers.length);
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (int i = 0; i < writers.length; i++) {
final int indexerId = i;
final Client client = client();
writers[i] = new Thread() {
@Override
public void run() {
long id = -1;
try {
logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) {
long id = idGenerator.incrementAndGet();
id = idGenerator.incrementAndGet();
client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId)
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
indexCounter.incrementAndGet();
}
logger.info("**** done indexing thread {}", indexerId);
} catch (Throwable e) {
logger.warn("**** failed indexing thread {}", e, indexerId);
failures.add(e);
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
} finally {
stopLatch.countDown();
}
@ -341,7 +345,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
writers[i].start();
}
for (int i = 0; i < 100000; i += 1000) {
final int numDocs = between(10000, 50000);
for (int i = 0; i < numDocs; i += between(100, 1000)) {
assertThat(failures, emptyIterable());
logger.info("--> waiting for {} docs to be indexed ...", i);
waitForDocs(i);
logger.info("--> {} docs indexed", i);
@ -353,6 +359,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
logger.info("--> marking and waiting for indexing threads to stop ...");
stop.set(true);
assertThat(failures, emptyIterable());
stopLatch.await();
logger.info("--> indexing threads stopped");