[TEST] Remove busy waiting from BenchmarkIntegrationTest

I think Chuck Norris is required to fix this at this point until we have an API
that can for instance pause a Benchmark. We basically wait for a query to be executed
and that query syncs on a latch with the test in a script :)

This commit also adds some more testing for benchmarks that run into errors.
This commit is contained in:
Simon Willnauer 2014-05-14 14:37:14 +02:00
parent e0a95d9c19
commit fc2ab0909e
4 changed files with 168 additions and 75 deletions

View File

@ -348,12 +348,16 @@ public class BenchmarkExecutor {
}
public void onFailure(Throwable e) {
manage();
if (errorMessages.size() < 5) {
logger.error("Failed to execute benchmark [{}]", e.getMessage(), e);
e = ExceptionsHelper.unwrapCause(e);
errorMessages.add(e.getLocalizedMessage());
try {
if (errorMessages.size() < 5) {
logger.debug("Failed to execute benchmark [{}]", e.getMessage(), e);
e = ExceptionsHelper.unwrapCause(e);
errorMessages.add(e.getLocalizedMessage());
}
} finally {
manage(); // first add the msg then call the count down on the latch otherwise we might iss one error
}
}
}
@ -383,9 +387,13 @@ public class BenchmarkExecutor {
@Override
public void onFailure(Throwable e) {
timeBuckets[bucketId] = -1;
docBuckets[bucketId] = -1;
super.onFailure(e);
try {
timeBuckets[bucketId] = -1;
docBuckets[bucketId] = -1;
} finally {
super.onFailure(e);
}
}
}

View File

@ -45,8 +45,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Service component for running benchmarks
@ -103,9 +106,8 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
} else {
BenchmarkStatusAsyncHandler async = new BenchmarkStatusAsyncHandler(nodes.size(), request, listener);
for (DiscoveryNode node : nodes) {
if (isBenchmarkNode(node)) {
transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async);
}
assert isBenchmarkNode(node);
transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async);
}
}
}
@ -494,6 +496,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
BenchmarkResponse response = new BenchmarkResponse();
// Merge node responses into a single consolidated response
List<String> errors = new ArrayList<>();
for (BenchmarkResponse r : responses) {
for (Map.Entry<String, CompetitionResult> entry : r.competitionResults.entrySet()) {
if (!response.competitionResults.containsKey(entry.getKey())) {
@ -505,12 +508,21 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
CompetitionResult cr = response.competitionResults.get(entry.getKey());
cr.nodeResults().addAll(entry.getValue().nodeResults());
}
if (r.hasErrors()) {
for (String error : r.errors()) {
errors.add(error);
}
}
if (response.benchmarkName() == null) {
response.benchmarkName(r.benchmarkName());
}
assert response.benchmarkName().equals(r.benchmarkName());
if (!errors.isEmpty()) {
response.errors(errors.toArray(new String[errors.size()]));
}
response.mergeState(r.state());
assert errors.isEmpty() || response.state() != BenchmarkResponse.State.COMPLETE : "Response can't be complete since it has errors";
}
return response;

View File

@ -18,21 +18,27 @@
*/
package org.elasticsearch.action.bench;
import com.google.common.base.Predicate;
import org.apache.lucene.util.English;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.searchRequest;
import static org.elasticsearch.index.query.QueryBuilders.functionScoreQuery;
import static org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders.scriptFunction;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.*;
/**
@ -46,32 +52,38 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
private static final String INDEX_PREFIX = "test_index_";
private static final String INDEX_TYPE = "test_type";
private static final int MIN_DOC_COUNT = 1;
private static final int MAX_DOC_COUNT = 1000;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final long TIMEOUT = 20;
private int numExecutorNodes = 0;
private Map<String, BenchmarkSettings> competitionSettingsMap;
private String[] indices = Strings.EMPTY_ARRAY;
private HashMap<Integer, Boolean> benchNodes = new HashMap<>();
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder().put("node.bench", true).build();
}
private final Predicate<Object> statusPredicate = new Predicate<Object>() {
@Override
public boolean apply(Object input) {
final BenchmarkStatusResponse status = client().prepareBenchStatus().execute().actionGet();
// We expect to have one active benchmark on each node
return (status.totalActiveBenchmarks() == numExecutorNodes);
protected synchronized Settings nodeSettings(int nodeOrdinal) {
if (nodeOrdinal == 0) { // at least one
return ImmutableSettings.builder().put("node.bench", true).build();
} else {
if (benchNodes.containsKey(nodeOrdinal)) {
return ImmutableSettings.builder().put("node.bench", benchNodes.get(nodeOrdinal)).build();
} else {
boolean b = randomBoolean();
benchNodes.put(nodeOrdinal, b);
return ImmutableSettings.builder().put("node.bench", b).build();
}
}
};
}
@Before
public void beforeBenchmarkIntegrationTests() throws Exception {
numExecutorNodes = cluster().size();
waitForTestLatch = null;
waitForQuery = null;
numExecutorNodes = 1; // node 0 is always an executor
for (Boolean benchExecutor : benchNodes.values()) {
if (benchExecutor) {
numExecutorNodes++;
}
}
competitionSettingsMap = new HashMap<>();
logger.info("--> indexing random data");
indices = randomData();
@ -100,23 +112,20 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
@Test
public void testListBenchmarks() throws Exception {
SearchRequest searchRequest = prepareBlockingScriptQuery();
final BenchmarkRequest request =
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap,
BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL);
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest);
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
request.settings().iterations());
final ActionFuture<BenchmarkResponse> future = client().bench(request);
final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT);
assertTrue(ret);
assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS));
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
assertThat(statusResponse.benchmarkResponses().size(), greaterThanOrEqualTo(0));
waitForTestLatch.countDown();
assertThat(statusResponse.benchmarkResponses().size(), greaterThan(0));
for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) {
assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING));
assertFalse(benchmarkResponse.hasErrors());
@ -134,23 +143,88 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
assertThat(future.get().state(), isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE));
}
public static CountDownLatch waitForTestLatch;
public static CountDownLatch waitForQuery;
private SearchRequest prepareBlockingScriptQuery() {
/* Chuck Norris back in the house!! - this is super evil but the only way at this
point to ensure we actually call abort / list while a benchmark is executing
without doing busy waiting etc. This Script calls the two static latches above and this test
will not work if somebody messes around with them but it's much faster and less resource intensive / hardware
dependent to run massive benchmarks and do busy waiting. */
cluster(); // mark that we need a JVM local cluster!
waitForQuery = new CountDownLatch(1);
waitForTestLatch = new CountDownLatch(1);
String className = "BenchmarkIntegrationTest";
ScriptScoreFunctionBuilder scriptFunction = scriptFunction("import " + this.getClass().getName() + "; \n" +
className + ".waitForQuery.countDown(); \n" + className + ".waitForTestLatch.await(); \n return 1.0;");
SearchRequest searchRequest = searchRequest().source(
searchSource()
.query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction)));
return searchRequest;
}
@Test
public void testAbortBenchmark() throws Exception {
public void testBenchmarkWithErrors() {
List<SearchRequest> reqList = new ArrayList<>();
int numQueries = scaledRandomIntBetween(20, 100);
int numErrors = scaledRandomIntBetween(1, numQueries);
final boolean containsFatal = randomBoolean();
if (containsFatal) {
ScriptScoreFunctionBuilder scriptFunction = scriptFunction("DOES NOT COMPILE - fails on any shard");
SearchRequest searchRequest = searchRequest().source(
searchSource()
.query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction)));
reqList.add(searchRequest);
}
for (int i = 0; reqList.size() < numErrors; i++) {
ScriptScoreFunctionBuilder scriptFunction = scriptFunction("throw new RuntimeException();");
SearchRequest searchRequest = searchRequest().source(
searchSource()
.query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction)));
reqList.add(searchRequest);
}
logger.info("--> run with [{}] errors ", numErrors);
for (int i = 0; reqList.size() < numQueries; i++) {
reqList.add(BenchmarkTestUtil.randomSearch(client(), indices));
}
Collections.shuffle(reqList, getRandom());
final BenchmarkRequest request =
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap,
BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL);
BenchmarkTestUtil.randomRequest(client(),indices, numExecutorNodes, competitionSettingsMap, reqList.toArray(new SearchRequest[0]));
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
request.settings().iterations());
final BenchmarkResponse response = client().bench(request).actionGet();
assertThat(response, notNullValue());
if (response.hasErrors() || containsFatal) {
assertThat(response.state(), equalTo(BenchmarkResponse.State.FAILED));
} else {
assertThat(response.state(), equalTo(BenchmarkResponse.State.COMPLETE));
for (CompetitionResult result : response.competitionResults().values()) {
assertThat(result.nodeResults().size(), equalTo(numExecutorNodes));
validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), true);
}
}
assertThat(response.benchmarkName(), equalTo(BENCHMARK_NAME));
}
@Test
public void testAbortBenchmark() throws Exception {
SearchRequest searchRequest = prepareBlockingScriptQuery();
final BenchmarkRequest request =
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest);
request.settings().iterations(Integer.MAX_VALUE, true); // massive amount of iterations
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}] [{}]", request.competitors().size(),
request.settings().iterations(), request);
final ActionFuture<BenchmarkResponse> benchmarkResponse = client().bench(request);
final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT);
assertTrue(ret);
assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS));
final AbortBenchmarkResponse abortResponse =
client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet();
waitForTestLatch.countDown();
// Confirm that the benchmark was actually aborted and did not finish on its own
assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes));
assertThat(abortResponse.getBenchmarkName(), equalTo(BENCHMARK_NAME));
@ -166,7 +240,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0));
// Confirm that benchmark was indeed aborted
assertThat(benchmarkResponse.get().state(),isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE));
assertThat(benchmarkResponse.get().state(), is(BenchmarkResponse.State.ABORTED));
}
@Test(expected = BenchmarkMissingException.class)
@ -175,7 +249,6 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
}
private void validateCompetitionResult(CompetitionResult result, BenchmarkSettings requestedSettings, boolean strict) {
// Validate settings
assertTrue(result.competitionName().startsWith(COMPETITOR_PREFIX));
assertThat(result.concurrency(), equalTo(requestedSettings.concurrency()));
@ -247,12 +320,12 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
private String[] randomData() throws Exception {
final int numIndices = between(BenchmarkTestUtil.MIN_SMALL_INTERVAL, BenchmarkTestUtil.MAX_SMALL_INTERVAL);
final int numIndices = scaledRandomIntBetween(1, 5);
final String[] indices = new String[numIndices];
for (int i = 0; i < numIndices; i++) {
indices[i] = INDEX_PREFIX + i;
final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT);
final int numDocs = scaledRandomIntBetween(1, 100);
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int j = 0; j < numDocs; j++) {

View File

@ -38,12 +38,6 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomAsciiOfL
*/
public class BenchmarkTestUtil {
public static final int MIN_MULTIPLIER = 10;
public static final int MAX_MULTIPLIER = 500;
public static final int MIN_SMALL_INTERVAL = 1;
public static final int MAX_SMALL_INTERVAL = 3;
public static final int MIN_LARGE_INTERVAL = 1;
public static final int MAX_LARGE_INTERVAL = 7;
public static final String BENCHMARK_NAME = "test_benchmark";
public static final String COMPETITOR_PREFIX = "competitor_";
@ -83,21 +77,21 @@ public class BenchmarkTestUtil {
@Override
QueryBuilder getQuery() {
return QueryBuilders.matchQuery(TestIndexField.STRING_FIELD.toString(),
randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL));
randomAsciiOfLengthBetween(1, 3));
}
},
TERM {
@Override
QueryBuilder getQuery() {
return QueryBuilders.termQuery(TestIndexField.STRING_FIELD.toString(),
randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL));
randomAsciiOfLengthBetween(1, 3));
}
},
QUERY_STRING {
@Override
QueryBuilder getQuery() {
return QueryBuilders.queryString(
randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL));
randomAsciiOfLengthBetween(1, 3));
}
},
WILDCARD {
@ -113,7 +107,7 @@ public class BenchmarkTestUtil {
public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes,
Map<String, BenchmarkSettings> competitionSettingsMap,
int lowRandomIntervalBound, int highRandomIntervalBound) {
int lowRandomIntervalBound, int highRandomIntervalBound, SearchRequest... requests) {
final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices);
final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound);
@ -128,7 +122,7 @@ public class BenchmarkTestUtil {
final int numCompetitors = between(lowRandomIntervalBound, highRandomIntervalBound);
for (int i = 0; i < numCompetitors; i++) {
builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices,
competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound));
competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound, requests));
}
final BenchmarkRequest request = builder.request();
@ -140,10 +134,10 @@ public class BenchmarkTestUtil {
}
public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes,
Map<String, BenchmarkSettings> competitionSettingsMap) {
Map<String, BenchmarkSettings> competitionSettingsMap, SearchRequest... requests) {
return randomRequest(client, indices, numExecutorNodes,
competitionSettingsMap, MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL);
competitionSettingsMap, 1, 3, requests);
}
public static SearchRequest randomSearch(Client client, String[] indices) {
@ -156,8 +150,8 @@ public class BenchmarkTestUtil {
}
public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices,
Map<String, BenchmarkSettings> competitionSettingsMap,
int lowRandomIntervalBound, int highRandomIntervalBound) {
Map<String, BenchmarkSettings> competitionSettingsMap,
int lowRandomIntervalBound, int highRandomIntervalBound, SearchRequest... requests) {
final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder();
final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound);
@ -169,12 +163,18 @@ public class BenchmarkTestUtil {
builder.setSearchType(settings.searchType());
builder.setWarmup(settings.warmup());
builder.setName(name);
final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound);
for (int i = 0; i < numSearches; i++) {
final SearchRequest searchRequest = randomSearch(client, indices);
builder.addSearchRequest(searchRequest);
settings.addSearchRequest(searchRequest);
if (requests != null && requests.length != 0) {
for (int i = 0; i < requests.length; i++) {
builder.addSearchRequest(requests[i]);
settings.addSearchRequest(requests[i]);
}
} else {
final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound);
for (int i = 0; i < numSearches; i++) {
final SearchRequest searchRequest = randomSearch(client, indices);
builder.addSearchRequest(searchRequest);
settings.addSearchRequest(searchRequest);
}
}
if (competitionSettingsMap != null) {
@ -211,7 +211,7 @@ public class BenchmarkTestUtil {
settings.concurrency(between(lowRandomIntervalBound, highRandomIntervalBound), true);
settings.iterations(between(lowRandomIntervalBound, highRandomIntervalBound), true);
settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true);
settings.multiplier(between(1, 50), true);
settings.warmup(randomBoolean(), true);
settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true);