Improve assertion and busy waiting for RecoveryWhileUnterLoadTests

This commit is contained in:
Simon Willnauer 2013-08-23 11:02:14 +02:00
parent 6c24a0af3e
commit a943135ef6
1 changed files with 34 additions and 42 deletions

View File

@ -19,8 +19,10 @@
package org.elasticsearch.test.integration.recovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.MapBuilder;
@ -30,12 +32,14 @@ import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/**
*
@ -75,7 +79,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
indexCounter.incrementAndGet();
}
logger.info("**** done indexing thread {}", indexerId);
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("**** failed indexing thread {}", e, indexerId);
} finally {
stopLatch.countDown();
@ -86,10 +90,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
}
logger.info("--> waiting for 2000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(2000);
logger.info("--> 2000 docs indexed");
logger.info("--> flushing the index ....");
@ -98,10 +99,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> waiting for 4000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 4000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(4000);
logger.info("--> 4000 docs indexed");
logger.info("--> allow 2 nodes for index [test] ...");
@ -113,10 +111,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=2").execute().actionGet().isTimedOut(), equalTo(false));
logger.info("--> waiting for 10000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 10000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(15000);
logger.info("--> 10000 docs indexed");
logger.info("--> marking and waiting for indexing threads to stop ...");
@ -128,7 +123,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
refreshAndAssert();
logger.info("--> verifying indexed content");
for (int i = 0; i < 10; i++) {
assertThat("iteration: " + i + " failed", client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(indexCounter.get()));
CountResponse actionGet = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
assertNoFailures(actionGet);
assertThat("iteration: " + i + " failed", actionGet.getCount(), equalTo(indexCounter.get()));
}
}
@ -159,7 +156,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
indexCounter.incrementAndGet();
}
logger.info("**** done indexing thread {}", indexerId);
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("**** failed indexing thread {}", e, indexerId);
} finally {
stopLatch.countDown();
@ -170,10 +167,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
}
logger.info("--> waiting for 2000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(2000);
logger.info("--> 2000 docs indexed");
logger.info("--> flushing the index ....");
@ -182,10 +176,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> waiting for 4000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 4000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(4000);
logger.info("--> 4000 docs indexed");
logger.info("--> allow 4 nodes for index [test] ...");
allowNodes("test", 4);
@ -195,10 +186,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> waiting for 15000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 15000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(15000);
logger.info("--> 15000 docs indexed");
stop.set(true);
@ -213,7 +201,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
refreshAndAssert();
logger.info("--> verifying indexed content");
for (int i = 0; i < 10; i++) {
assertThat("iteration: " + i + " failed", client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(indexCounter.get()));
CountResponse actionGet = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
assertNoFailures(actionGet);
assertThat("iteration: " + i + " failed", actionGet.getCount(), equalTo(indexCounter.get()));
}
}
@ -244,7 +234,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
indexCounter.incrementAndGet();
}
logger.info("**** done indexing thread {}", indexerId);
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("**** failed indexing thread {}", e, indexerId);
} finally {
stopLatch.countDown();
@ -255,10 +245,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
}
logger.info("--> waiting for 2000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 2000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(2000);
logger.info("--> 2000 docs indexed");
logger.info("--> flushing the index ....");
@ -267,10 +254,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> waiting for 4000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 4000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(4000);
logger.info("--> 4000 docs indexed");
// now start more nodes, while we index
@ -282,10 +266,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> waiting for 10000 docs to be indexed ...");
while (client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() < 10000) {
Thread.sleep(100);
client().admin().indices().prepareRefresh().execute().actionGet();
}
waitForDocs(15000);
logger.info("--> 10000 docs indexed");
// now, shutdown nodes
@ -316,7 +297,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
logger.info("--> verifying indexed content");
refreshAndAssert();
for (int i = 0; i < 10; i++) {
assertThat("iteration: " + i + " failed", client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(indexCounter.get()));
CountResponse actionGet = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
assertNoFailures(actionGet);
assertThat("iteration: " + i + " failed", actionGet.getCount(), equalTo(indexCounter.get()));
}
}
@ -325,4 +308,13 @@ public class RecoveryWhileUnderLoadTests extends AbstractSharedClusterTest {
assertNoFailures(actionGet);
return actionGet;
}
private void waitForDocs(final long numDocs) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
return client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() > numDocs;
}
}, 5, TimeUnit.MINUTES); // not really relevant here we just have to wait some time
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), greaterThan(numDocs));
}
}