diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java index 352ed526083..b40e6f52e7f 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java @@ -348,12 +348,16 @@ public class BenchmarkExecutor { } public void onFailure(Throwable e) { - manage(); - if (errorMessages.size() < 5) { - logger.error("Failed to execute benchmark [{}]", e.getMessage(), e); - e = ExceptionsHelper.unwrapCause(e); - errorMessages.add(e.getLocalizedMessage()); + try { + if (errorMessages.size() < 5) { + logger.debug("Failed to execute benchmark [{}]", e.getMessage(), e); + e = ExceptionsHelper.unwrapCause(e); + errorMessages.add(e.getLocalizedMessage()); + } + } finally { + manage(); // first add the msg then call the count down on the latch otherwise we might iss one error } + } } @@ -383,9 +387,13 @@ public class BenchmarkExecutor { @Override public void onFailure(Throwable e) { - timeBuckets[bucketId] = -1; - docBuckets[bucketId] = -1; - super.onFailure(e); + try { + timeBuckets[bucketId] = -1; + docBuckets[bucketId] = -1; + } finally { + super.onFailure(e); + } + } } diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java index 0ce7de26554..9b644c5dc07 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java @@ -45,8 +45,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; /** * Service component for running benchmarks @@ -103,9 +106,8 @@ public class BenchmarkService extends AbstractLifecycleComponent errors = new ArrayList<>(); for (BenchmarkResponse r : responses) { for (Map.Entry entry : r.competitionResults.entrySet()) { if (!response.competitionResults.containsKey(entry.getKey())) { @@ -505,12 +508,21 @@ public class BenchmarkService extends AbstractLifecycleComponent competitionSettingsMap; private String[] indices = Strings.EMPTY_ARRAY; + private HashMap benchNodes = new HashMap<>(); - protected Settings nodeSettings(int nodeOrdinal) { - return ImmutableSettings.builder().put("node.bench", true).build(); - } - private final Predicate statusPredicate = new Predicate() { - @Override - public boolean apply(Object input) { - final BenchmarkStatusResponse status = client().prepareBenchStatus().execute().actionGet(); - // We expect to have one active benchmark on each node - return (status.totalActiveBenchmarks() == numExecutorNodes); + + protected synchronized Settings nodeSettings(int nodeOrdinal) { + if (nodeOrdinal == 0) { // at least one + return ImmutableSettings.builder().put("node.bench", true).build(); + } else { + if (benchNodes.containsKey(nodeOrdinal)) { + return ImmutableSettings.builder().put("node.bench", benchNodes.get(nodeOrdinal)).build(); + } else { + boolean b = randomBoolean(); + benchNodes.put(nodeOrdinal, b); + return ImmutableSettings.builder().put("node.bench", b).build(); + } + } - }; + } @Before public void beforeBenchmarkIntegrationTests() throws Exception { - numExecutorNodes = cluster().size(); + waitForTestLatch = null; + waitForQuery = null; + numExecutorNodes = 1; // node 0 is always an executor + for (Boolean benchExecutor : benchNodes.values()) { + if (benchExecutor) { + numExecutorNodes++; + } + } competitionSettingsMap = new HashMap<>(); logger.info("--> indexing random data"); indices = randomData(); @@ -100,23 +112,20 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { @Test public void testListBenchmarks() throws Exception { - + SearchRequest searchRequest = prepareBlockingScriptQuery(); final BenchmarkRequest request = - BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, - BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL); + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest); logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), request.settings().iterations()); final ActionFuture future = client().bench(request); - final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT); - assertTrue(ret); - + assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS)); final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); - assertThat(statusResponse.benchmarkResponses().size(), greaterThanOrEqualTo(0)); + waitForTestLatch.countDown(); + assertThat(statusResponse.benchmarkResponses().size(), greaterThan(0)); for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) { - assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING)); assertFalse(benchmarkResponse.hasErrors()); @@ -134,23 +143,88 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { assertThat(future.get().state(), isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE)); } + public static CountDownLatch waitForTestLatch; + public static CountDownLatch waitForQuery; + + private SearchRequest prepareBlockingScriptQuery() { + /* Chuck Norris back in the house!! - this is super evil but the only way at this + point to ensure we actually call abort / list while a benchmark is executing + without doing busy waiting etc. This Script calls the two static latches above and this test + will not work if somebody messes around with them but it's much faster and less resource intensive / hardware + dependent to run massive benchmarks and do busy waiting. */ + cluster(); // mark that we need a JVM local cluster! + waitForQuery = new CountDownLatch(1); + waitForTestLatch = new CountDownLatch(1); + String className = "BenchmarkIntegrationTest"; + ScriptScoreFunctionBuilder scriptFunction = scriptFunction("import " + this.getClass().getName() + "; \n" + + className + ".waitForQuery.countDown(); \n" + className + ".waitForTestLatch.await(); \n return 1.0;"); + SearchRequest searchRequest = searchRequest().source( + searchSource() + .query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction))); + return searchRequest; + } + @Test - public void testAbortBenchmark() throws Exception { + public void testBenchmarkWithErrors() { + List reqList = new ArrayList<>(); + int numQueries = scaledRandomIntBetween(20, 100); + int numErrors = scaledRandomIntBetween(1, numQueries); + final boolean containsFatal = randomBoolean(); + if (containsFatal) { + ScriptScoreFunctionBuilder scriptFunction = scriptFunction("DOES NOT COMPILE - fails on any shard"); + SearchRequest searchRequest = searchRequest().source( + searchSource() + .query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction))); + reqList.add(searchRequest); + + } + for (int i = 0; reqList.size() < numErrors; i++) { + ScriptScoreFunctionBuilder scriptFunction = scriptFunction("throw new RuntimeException();"); + SearchRequest searchRequest = searchRequest().source( + searchSource() + .query(functionScoreQuery(FilterBuilders.matchAllFilter(), scriptFunction))); + reqList.add(searchRequest); + } + logger.info("--> run with [{}] errors ", numErrors); + for (int i = 0; reqList.size() < numQueries; i++) { + + reqList.add(BenchmarkTestUtil.randomSearch(client(), indices)); + } + Collections.shuffle(reqList, getRandom()); final BenchmarkRequest request = - BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, - BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL); + BenchmarkTestUtil.randomRequest(client(),indices, numExecutorNodes, competitionSettingsMap, reqList.toArray(new SearchRequest[0])); logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), request.settings().iterations()); + final BenchmarkResponse response = client().bench(request).actionGet(); + assertThat(response, notNullValue()); + if (response.hasErrors() || containsFatal) { + assertThat(response.state(), equalTo(BenchmarkResponse.State.FAILED)); + } else { + assertThat(response.state(), equalTo(BenchmarkResponse.State.COMPLETE)); + for (CompetitionResult result : response.competitionResults().values()) { + assertThat(result.nodeResults().size(), equalTo(numExecutorNodes)); + validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), true); + } + } + assertThat(response.benchmarkName(), equalTo(BENCHMARK_NAME)); + + } + + @Test + public void testAbortBenchmark() throws Exception { + SearchRequest searchRequest = prepareBlockingScriptQuery(); + final BenchmarkRequest request = + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest); + request.settings().iterations(Integer.MAX_VALUE, true); // massive amount of iterations + logger.info("--> Submitting benchmark - competitors [{}] iterations [{}] [{}]", request.competitors().size(), + request.settings().iterations(), request); final ActionFuture benchmarkResponse = client().bench(request); - - final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT); - assertTrue(ret); - + assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS)); final AbortBenchmarkResponse abortResponse = client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); - + waitForTestLatch.countDown(); // Confirm that the benchmark was actually aborted and did not finish on its own assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes)); assertThat(abortResponse.getBenchmarkName(), equalTo(BENCHMARK_NAME)); @@ -166,7 +240,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0)); // Confirm that benchmark was indeed aborted - assertThat(benchmarkResponse.get().state(),isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE)); + assertThat(benchmarkResponse.get().state(), is(BenchmarkResponse.State.ABORTED)); } @Test(expected = BenchmarkMissingException.class) @@ -175,7 +249,6 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { } private void validateCompetitionResult(CompetitionResult result, BenchmarkSettings requestedSettings, boolean strict) { - // Validate settings assertTrue(result.competitionName().startsWith(COMPETITOR_PREFIX)); assertThat(result.concurrency(), equalTo(requestedSettings.concurrency())); @@ -247,12 +320,12 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { private String[] randomData() throws Exception { - final int numIndices = between(BenchmarkTestUtil.MIN_SMALL_INTERVAL, BenchmarkTestUtil.MAX_SMALL_INTERVAL); + final int numIndices = scaledRandomIntBetween(1, 5); final String[] indices = new String[numIndices]; for (int i = 0; i < numIndices; i++) { indices[i] = INDEX_PREFIX + i; - final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT); + final int numDocs = scaledRandomIntBetween(1, 100); final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; for (int j = 0; j < numDocs; j++) { diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java index 09598ae22c6..cae64dffe19 100644 --- a/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java @@ -38,12 +38,6 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomAsciiOfL */ public class BenchmarkTestUtil { - public static final int MIN_MULTIPLIER = 10; - public static final int MAX_MULTIPLIER = 500; - public static final int MIN_SMALL_INTERVAL = 1; - public static final int MAX_SMALL_INTERVAL = 3; - public static final int MIN_LARGE_INTERVAL = 1; - public static final int MAX_LARGE_INTERVAL = 7; public static final String BENCHMARK_NAME = "test_benchmark"; public static final String COMPETITOR_PREFIX = "competitor_"; @@ -83,21 +77,21 @@ public class BenchmarkTestUtil { @Override QueryBuilder getQuery() { return QueryBuilders.matchQuery(TestIndexField.STRING_FIELD.toString(), - randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + randomAsciiOfLengthBetween(1, 3)); } }, TERM { @Override QueryBuilder getQuery() { return QueryBuilders.termQuery(TestIndexField.STRING_FIELD.toString(), - randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + randomAsciiOfLengthBetween(1, 3)); } }, QUERY_STRING { @Override QueryBuilder getQuery() { return QueryBuilders.queryString( - randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + randomAsciiOfLengthBetween(1, 3)); } }, WILDCARD { @@ -113,7 +107,7 @@ public class BenchmarkTestUtil { public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes, Map competitionSettingsMap, - int lowRandomIntervalBound, int highRandomIntervalBound) { + int lowRandomIntervalBound, int highRandomIntervalBound, SearchRequest... requests) { final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices); final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound); @@ -128,7 +122,7 @@ public class BenchmarkTestUtil { final int numCompetitors = between(lowRandomIntervalBound, highRandomIntervalBound); for (int i = 0; i < numCompetitors; i++) { builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices, - competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound)); + competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound, requests)); } final BenchmarkRequest request = builder.request(); @@ -140,10 +134,10 @@ public class BenchmarkTestUtil { } public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes, - Map competitionSettingsMap) { + Map competitionSettingsMap, SearchRequest... requests) { return randomRequest(client, indices, numExecutorNodes, - competitionSettingsMap, MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + competitionSettingsMap, 1, 3, requests); } public static SearchRequest randomSearch(Client client, String[] indices) { @@ -156,8 +150,8 @@ public class BenchmarkTestUtil { } public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices, - Map competitionSettingsMap, - int lowRandomIntervalBound, int highRandomIntervalBound) { + Map competitionSettingsMap, + int lowRandomIntervalBound, int highRandomIntervalBound, SearchRequest... requests) { final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder(); final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound); @@ -169,12 +163,18 @@ public class BenchmarkTestUtil { builder.setSearchType(settings.searchType()); builder.setWarmup(settings.warmup()); builder.setName(name); - - final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound); - for (int i = 0; i < numSearches; i++) { - final SearchRequest searchRequest = randomSearch(client, indices); - builder.addSearchRequest(searchRequest); - settings.addSearchRequest(searchRequest); + if (requests != null && requests.length != 0) { + for (int i = 0; i < requests.length; i++) { + builder.addSearchRequest(requests[i]); + settings.addSearchRequest(requests[i]); + } + } else { + final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound); + for (int i = 0; i < numSearches; i++) { + final SearchRequest searchRequest = randomSearch(client, indices); + builder.addSearchRequest(searchRequest); + settings.addSearchRequest(searchRequest); + } } if (competitionSettingsMap != null) { @@ -211,7 +211,7 @@ public class BenchmarkTestUtil { settings.concurrency(between(lowRandomIntervalBound, highRandomIntervalBound), true); settings.iterations(between(lowRandomIntervalBound, highRandomIntervalBound), true); - settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true); + settings.multiplier(between(1, 50), true); settings.warmup(randomBoolean(), true); settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true);