From f23274523a4d957156fa48923cbf651bdeb2813f Mon Sep 17 00:00:00 2001 From: Andrew Selden Date: Wed, 30 Apr 2014 21:52:32 -0700 Subject: [PATCH] Integration tests for benchmark API. - Randomized integration tests for the benchmark API. - Negative tests for cases where the cluster cannot run benchmarks. - Return 404 on missing benchmark name. - Allow to specify 'types' as an array in the JSON syntax when describing a benchmark competition. - Don't record slowest for single-request competitions. Closes #6003, #5906, #5903, #5904 --- docs/reference/search/benchmark.asciidoc | 41 +-- .../bench/AbortBenchmarkNodeResponse.java | 18 +- .../action/bench/AbortBenchmarkResponse.java | 12 +- .../bench/BenchmarkExecutionException.java | 52 +++ .../action/bench/BenchmarkExecutor.java | 154 +++++---- .../bench/BenchmarkMissingException.java | 38 +++ .../bench/BenchmarkNodeMissingException.java | 2 +- .../action/bench/BenchmarkRequest.java | 2 +- .../action/bench/BenchmarkResponse.java | 91 ++++- .../action/bench/BenchmarkService.java | 22 +- .../action/bench/BenchmarkSettings.java | 1 + .../action/bench/BenchmarkStatusResponse.java | 31 -- .../action/bench/CompetitionNodeResult.java | 18 +- .../action/bench/CompetitionResult.java | 50 +-- .../action/bench/CompetitionSummary.java | 42 +-- .../java/org/elasticsearch/client/Client.java | 5 + .../client/support/AbstractClient.java | 5 + .../rest/action/bench/RestBenchAction.java | 26 +- .../bench/BenchmarkIntegrationTest.java | 310 ++++++++++++++++++ .../action/bench/BenchmarkNegativeTest.java | 55 ++++ .../action/bench/BenchmarkTestUtil.java | 209 ++++++++++++ .../test/client/RandomizingClient.java | 5 + 22 files changed, 927 insertions(+), 262 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/bench/BenchmarkExecutionException.java create mode 100644 src/main/java/org/elasticsearch/action/bench/BenchmarkMissingException.java create mode 100644 src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java create mode 100644 src/test/java/org/elasticsearch/action/bench/BenchmarkNegativeTest.java create mode 100644 src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java diff --git a/docs/reference/search/benchmark.asciidoc b/docs/reference/search/benchmark.asciidoc index 165e2698a38..7b604118a83 100644 --- a/docs/reference/search/benchmark.asciidoc +++ b/docs/reference/search/benchmark.asciidoc @@ -87,32 +87,33 @@ competing alternatives side-by-side. There are several parameters which may be set at the competition level: [horizontal] -`name`:: Unique name for the competition -`iterations`:: Number of times to run the competitors -`concurrency`:: Within each iteration use this level of parallelism -`multiplier`:: Within each iteration run the query this many times -`warmup`:: Perform warmup of query -`num_slowest`:: Record N slowest queries -`search_type`:: Type of search, e.g. "query_then_fetch", "dfs_query_then_fetch", "count" -`requests`:: Query DSL describing search requests -`clear_caches`:: Whether caches should be cleared on each iteration, and if so, how -`indices`:: Array of indices (and optional types) to search, e.g. ["my_index_1/my_type_1", "my_index_2", "my_index_3/my_type_3"] +`name`:: Unique name for the competition. +`iterations`:: Number of times to run the competitors. Defaults to `5`. +`concurrency`:: Within each iteration use this level of parallelism. Defaults to `5`. +`multiplier`:: Within each iteration run the query this many times. Defaults to `1000`. +`warmup`:: Perform warmup of query. Defaults to `true`. +`num_slowest`:: Record N slowest queries. Defaults to `1`. +`search_type`:: Type of search, e.g. "query_then_fetch", "dfs_query_then_fetch", "count". Defaults to `query_then_fetch`. +`requests`:: Query DSL describing search requests. +`clear_caches`:: Whether caches should be cleared on each iteration, and if so, how. Caches are not cleared by default. +`indices`:: Array of indices to search, e.g. ["my_index_1", "my_index_2", "my_index_3"]. +`types`:: Array of index types to search, e.g. ["my_type_1", "my_type_2"]. Cache clearing parameters: [horizontal] -`clear_caches`:: Set to 'false' to disable cache clearing completely -`clear_caches.filter`:: Whether to clear the filter cache -`clear_caches.field_data`:: Whether to clear the field data cache -`clear_caches.id`:: Whether to clear the id cache -`clear_caches.recycler`:: Whether to clear the recycler cache -`clear_caches.fields`:: Array of fields to clear -`clear_caches.filter_keys`:: Array of filter keys to clear +`clear_caches`:: Set to 'false' to disable cache clearing completely. +`clear_caches.filter`:: Whether to clear the filter cache. +`clear_caches.field_data`:: Whether to clear the field data cache. +`clear_caches.id`:: Whether to clear the id cache. +`clear_caches.recycler`:: Whether to clear the recycler cache. +`clear_caches.fields`:: Array of fields to clear. +`clear_caches.filter_keys`:: Array of filter keys to clear. Global parameters: [horizontal] -`name`:: Unique name for the benchmark +`name`:: Unique name for the benchmark. `num_executor_nodes`:: Number of cluster nodes from which to submit and time benchmarks. Allows user to run a benchmark simultaneously on one or more nodes and compare timings. Note that this does not control how many nodes a search request will actually execute on. Defaults to: 1. -`percentiles`:: Array of percentile values to report. Defaults to: [10, 25, 50, 75, 90, 99] +`percentiles`:: Array of percentile values to report. Defaults to: [10, 25, 50, 75, 90, 99]. Additionally, the following competition-level parameters may be set globally: iteration, concurrency, multiplier, warmup, and clear_caches. @@ -149,6 +150,7 @@ $ curl -XPUT 'localhost:9200/_bench/?pretty=true' -d "name": "competitor_1", "search_type": "query_then_fetch", "indices": [ "my_index_1" ], + "types": [ "my_type_1" ], "clear_caches" : { "filter" : true, "field_data" : true, @@ -160,6 +162,7 @@ $ curl -XPUT 'localhost:9200/_bench/?pretty=true' -d "name": "competitor_2", "search_type": "dfs_query_then_fetch", "indices": [ "my_index_2" ], + "types": [ "my_type_2" ], "clear_caches" : { "filter" : true, "field_data" : true, diff --git a/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkNodeResponse.java b/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkNodeResponse.java index 72da7ce9a8d..26320159763 100644 --- a/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkNodeResponse.java +++ b/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkNodeResponse.java @@ -45,12 +45,6 @@ public class AbortBenchmarkNodeResponse extends ActionResponse implements Stream this.nodeName = nodeName; } - public AbortBenchmarkNodeResponse(String benchmarkName, String nodeName, boolean aborted, String errorMessage) { - this.benchmarkName = benchmarkName; - this.nodeName = nodeName; - this.errorMessage = errorMessage; - } - public void nodeName(String nodeName) { this.nodeName = nodeName; } @@ -59,6 +53,14 @@ public class AbortBenchmarkNodeResponse extends ActionResponse implements Stream return nodeName; } + public String benchmarkName() { + return benchmarkName; + } + + public String errorMessage() { + return errorMessage; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.NODE, nodeName); @@ -72,7 +74,7 @@ public class AbortBenchmarkNodeResponse extends ActionResponse implements Stream super.readFrom(in); benchmarkName = in.readString(); nodeName = in.readString(); - errorMessage = in.readString(); + errorMessage = in.readOptionalString(); } @Override @@ -80,7 +82,7 @@ public class AbortBenchmarkNodeResponse extends ActionResponse implements Stream super.writeTo(out); out.writeString(benchmarkName); out.writeString(nodeName); - out.writeString(errorMessage); + out.writeOptionalString(errorMessage); } static final class Fields { diff --git a/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkResponse.java b/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkResponse.java index b839c7fe34f..1e645ea5bd5 100644 --- a/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkResponse.java +++ b/src/main/java/org/elasticsearch/action/bench/AbortBenchmarkResponse.java @@ -56,11 +56,19 @@ public class AbortBenchmarkResponse extends ActionResponse implements Streamable nodeResponses.add(nodeResponse); } + public List getNodeResponses() { + return nodeResponses; + } + + public String getBenchmarkName() { + return benchmarkName; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); benchmarkName = in.readString(); - errorMessage = in.readString(); + errorMessage = in.readOptionalString(); int size = in.readVInt(); for (int i = 0; i < size; i++) { AbortBenchmarkNodeResponse nodeResponse = new AbortBenchmarkNodeResponse(); @@ -73,7 +81,7 @@ public class AbortBenchmarkResponse extends ActionResponse implements Streamable public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(benchmarkName); - out.writeString(errorMessage); + out.writeOptionalString(errorMessage); out.writeVInt(nodeResponses.size()); for (AbortBenchmarkNodeResponse nodeResponse : nodeResponses) { nodeResponse.writeTo(out); diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutionException.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutionException.java new file mode 100644 index 00000000000..05200360531 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutionException.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bench; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.rest.RestStatus; + +import java.util.ArrayList; +import java.util.List; + +/** + * Indicates a benchmark failure due to too many failures being encountered. + */ +public class BenchmarkExecutionException extends ElasticsearchException { + + private List errorMessages = new ArrayList<>(); + + public BenchmarkExecutionException(String msg, Throwable cause) { + super(msg, cause); + } + + public BenchmarkExecutionException(String msg, List errorMessages) { + super(msg); + this.errorMessages.addAll(errorMessages); + } + + public List errorMessages() { + return errorMessages; + } + + @Override + public RestStatus status() { + return RestStatus.INTERNAL_SERVER_ERROR; + } +} diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java index fcf404ad883..24ecbd1b3ec 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -70,19 +69,19 @@ public class BenchmarkExecutor { /** * Aborts a benchmark with the given id * - * @param benchmarkId The benchmark to abort + * @param benchmarkName The benchmark to abort * @return Abort response */ - public AbortBenchmarkNodeResponse abortBenchmark(String benchmarkId) { - if (!activeBenchmarks.containsKey(benchmarkId)) { - return new AbortBenchmarkNodeResponse(benchmarkId, nodeName, false, "Benchmark with id [" + benchmarkId + "] not found"); - } - BenchmarkState state = activeBenchmarks.get(benchmarkId); + public AbortBenchmarkNodeResponse abortBenchmark(String benchmarkName) { + + BenchmarkState state = activeBenchmarks.get(benchmarkName); if (state == null) { - throw new ElasticsearchException("Benchmark with id [" + benchmarkId + "] is missing"); + throw new ElasticsearchException("Benchmark [" + benchmarkName + "] not found"); } state.semaphore.stop(); - return new AbortBenchmarkNodeResponse(benchmarkId, nodeName); + activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fRemove(benchmarkName).build(); + logger.debug("Aborted benchmark [{}]", benchmarkName); + return new AbortBenchmarkNodeResponse(benchmarkName, nodeName); } /** @@ -130,86 +129,80 @@ public class BenchmarkExecutor { } try { - final List errorMessages = new ArrayList<>(); + for (BenchmarkCompetitor competitor : request.competitors()) { - try { - for (BenchmarkCompetitor competitor : request.competitors()) { - final BenchmarkSettings settings = competitor.settings(); - final int iterations = settings.iterations(); - logger.debug("Executing [{}] iterations for benchmark [{}][{}] ", iterations, request.benchmarkName(), competitor.name()); + final BenchmarkSettings settings = competitor.settings(); + final int iterations = settings.iterations(); + logger.debug("Executing [{}] iterations for benchmark [{}][{}] ", iterations, request.benchmarkName(), competitor.name()); - final List competitionIterations = new ArrayList<>(iterations); - final CompetitionResult competitionResult = - new CompetitionResult(competitor.name(), settings.concurrency(), settings.multiplier(), request.percentiles()); - final CompetitionNodeResult competitionNodeResult = - new CompetitionNodeResult(competitor.name(), nodeName, iterations, competitionIterations); + final List competitionIterations = new ArrayList<>(iterations); + final CompetitionResult competitionResult = + new CompetitionResult(competitor.name(), settings.concurrency(), settings.multiplier(), request.percentiles()); + final CompetitionNodeResult competitionNodeResult = + new CompetitionNodeResult(competitor.name(), nodeName, iterations, competitionIterations); - competitionResult.addCompetitionNodeResult(competitionNodeResult); - benchmarkResponse.competitionResults.put(competitor.name(), competitionResult); + competitionResult.addCompetitionNodeResult(competitionNodeResult); + benchmarkResponse.competitionResults.put(competitor.name(), competitionResult); - final List searchRequests = competitor.settings().searchRequests(); + final List searchRequests = competitor.settings().searchRequests(); - if (settings.warmup()) { - final long beforeWarmup = System.nanoTime(); - final List warmUpErrors = warmUp(competitor, searchRequests, semaphore); - final long afterWarmup = System.nanoTime(); - competitionNodeResult.warmUpTime(TimeUnit.MILLISECONDS.convert(afterWarmup - beforeWarmup, TimeUnit.NANOSECONDS)); - if (!warmUpErrors.isEmpty()) { - competitionNodeResult.failures(warmUpErrors.toArray(Strings.EMPTY_ARRAY)); - continue; - } + if (settings.warmup()) { + final long beforeWarmup = System.nanoTime(); + final List warmUpErrors = warmUp(competitor, searchRequests, semaphore); + final long afterWarmup = System.nanoTime(); + competitionNodeResult.warmUpTime(TimeUnit.MILLISECONDS.convert(afterWarmup - beforeWarmup, TimeUnit.NANOSECONDS)); + if (!warmUpErrors.isEmpty()) { + throw new BenchmarkExecutionException("Failed to execute warmup phase", warmUpErrors); } - - final int requestsPerRound = settings.multiplier() * searchRequests.size(); - final long[] timeBuckets = new long[requestsPerRound]; - final long[] docBuckets = new long[requestsPerRound]; - - for (int i = 0; i < iterations; i++) { - if (settings.allowCacheClearing() && settings.clearCaches() != null) { - try { - client.admin().indices().clearCache(settings.clearCaches()).get(); - } catch (ExecutionException e) { - throw new ElasticsearchException("Failed to clear caches before benchmark round", e); - } - } - - // Run the iteration - CompetitionIteration ci = runIteration(competitor, searchRequests, timeBuckets, docBuckets, errorMessages, semaphore); - ci.percentiles(request.percentiles()); - - if (errorMessages.isEmpty()) { - competitionIterations.add(ci); - competitionNodeResult.incrementCompletedIterations(); - } else { - competitionNodeResult.failures(errorMessages.toArray(Strings.EMPTY_ARRAY)); - return benchmarkResponse; - } - } - - competitionNodeResult.totalExecutedQueries(settings.multiplier() * searchRequests.size() * iterations); } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - benchmarkResponse.state(BenchmarkResponse.State.ABORTED); + + final int numMeasurements = settings.multiplier() * searchRequests.size(); + final long[] timeBuckets = new long[numMeasurements]; + final long[] docBuckets = new long[numMeasurements]; + + for (int i = 0; i < iterations; i++) { + if (settings.allowCacheClearing() && settings.clearCaches() != null) { + try { + client.admin().indices().clearCache(settings.clearCaches()).get(); + } catch (ExecutionException e) { + throw new BenchmarkExecutionException("Failed to clear caches", e); + } + } + + // Run the iteration + CompetitionIteration ci = + runIteration(competitor, searchRequests, timeBuckets, docBuckets, semaphore); + ci.percentiles(request.percentiles()); + competitionIterations.add(ci); + competitionNodeResult.incrementCompletedIterations(); + } + + competitionNodeResult.totalExecutedQueries(settings.multiplier() * searchRequests.size() * iterations); } + benchmarkResponse.state(BenchmarkResponse.State.COMPLETE); - return benchmarkResponse; + + } catch (BenchmarkExecutionException e) { + benchmarkResponse.state(BenchmarkResponse.State.FAILED); + benchmarkResponse.errors(e.errorMessages().toArray(new String[e.errorMessages().size()])); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + benchmarkResponse.state(BenchmarkResponse.State.ABORTED); } finally { synchronized (lock) { - if (!activeBenchmarks.containsKey(request.benchmarkName())) { - throw new ElasticsearchException("Benchmark with id [" + request.benchmarkName() + "] is missing"); - } semaphore.stop(); activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fRemove(request.benchmarkName()).build(); } } + + return benchmarkResponse; } private List warmUp(BenchmarkCompetitor competitor, List searchRequests, StoppableSemaphore stoppableSemaphore) throws InterruptedException { final StoppableSemaphore semaphore = stoppableSemaphore.reset(competitor.settings().concurrency()); final CountDownLatch totalCount = new CountDownLatch(searchRequests.size()); - final CopyOnWriteArrayList errorMessages = new CopyOnWriteArrayList(); + final CopyOnWriteArrayList errorMessages = new CopyOnWriteArrayList<>(); for (SearchRequest searchRequest : searchRequests) { semaphore.acquire(); @@ -220,7 +213,7 @@ public class BenchmarkExecutor { } private CompetitionIteration runIteration(BenchmarkCompetitor competitor, List searchRequests, - final long[] timeBuckets, final long[] docBuckets, List errors, + final long[] timeBuckets, final long[] docBuckets, StoppableSemaphore stoppableSemaphore) throws InterruptedException { assert timeBuckets.length == competitor.settings().multiplier() * searchRequests.size(); @@ -233,7 +226,7 @@ public class BenchmarkExecutor { int id = 0; final CountDownLatch totalCount = new CountDownLatch(timeBuckets.length); - final CopyOnWriteArrayList errorMessages = new CopyOnWriteArrayList(); + final CopyOnWriteArrayList errorMessages = new CopyOnWriteArrayList<>(); final long beforeRun = System.nanoTime(); for (int i = 0; i < competitor.settings().multiplier(); i++) { @@ -248,8 +241,7 @@ public class BenchmarkExecutor { assert id == timeBuckets.length; final long afterRun = System.nanoTime(); if (!errorMessages.isEmpty()) { - errors.addAll(errorMessages); - return null; + throw new BenchmarkExecutionException("Too many execution failures", errorMessages); } assert assertBuckets(timeBuckets); // make sure they are all set @@ -260,9 +252,12 @@ public class BenchmarkExecutor { CompetitionIterationData iterationData = new CompetitionIterationData(timeBuckets); long sumDocs = new CompetitionIterationData(docBuckets).sum(); - CompetitionIteration.SlowRequest[] topN = - competitor.settings().numSlowest() > 0 ? getTopN(timeBuckets, searchRequests, - competitor.settings().multiplier(), competitor.settings().numSlowest()) : null; + // Don't track slowest request if there is only one request as that is redundant + CompetitionIteration.SlowRequest[] topN = null; + if ((competitor.settings().numSlowest() > 0) && (searchRequests.size() > 1)) { + topN = getTopN(timeBuckets, searchRequests, competitor.settings().multiplier(), competitor.settings().numSlowest()); + } + CompetitionIteration round = new CompetitionIteration(topN, totalTime, timeBuckets.length, sumDocs, iterationData); @@ -369,8 +364,9 @@ public class BenchmarkExecutor { private final int bucketId; private final long[] docBuckets; - public StatisticCollectionActionListener(StoppableSemaphore semaphore, long[] timeBuckets, long[] docs, int bucketId, - CountDownLatch totalCount, CopyOnWriteArrayList errorMessages) { + public StatisticCollectionActionListener(StoppableSemaphore semaphore, long[] timeBuckets, long[] docs, + int bucketId, CountDownLatch totalCount, + CopyOnWriteArrayList errorMessages) { super(semaphore, totalCount, errorMessages); this.bucketId = bucketId; this.timeBuckets = timeBuckets; @@ -423,9 +419,9 @@ public class BenchmarkExecutor { } } - private final boolean assertBuckets(long[] buckets) { // assert only + private final boolean assertBuckets(long[] buckets) { for (int i = 0; i < buckets.length; i++) { - assert buckets[i] >= 0 : "Bucket value was negative: " + buckets[i]; + assert buckets[i] >= 0 : "Bucket value was negative: " + buckets[i] + " bucket id: " + i; } return true; } diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkMissingException.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkMissingException.java new file mode 100644 index 00000000000..dbd18722133 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkMissingException.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bench; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.rest.RestStatus; + +/** + * Thrown when a client tries to access a benchmark which does not exist + */ +public class BenchmarkMissingException extends ElasticsearchException { + + public BenchmarkMissingException(String msg) { + super(msg); + } + + @Override + public RestStatus status() { + return RestStatus.NOT_FOUND; + } +} diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java index a83659c7ddb..66f775ea221 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java @@ -23,7 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.rest.RestStatus; /** - * + * Indicates that a benchmark cannot be executed due to a lack of candidate nodes. */ public class BenchmarkNodeMissingException extends ElasticsearchException { diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkRequest.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkRequest.java index 77889967713..aebf0a30a45 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkRequest.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkRequest.java @@ -37,9 +37,9 @@ import java.util.List; public class BenchmarkRequest extends MasterNodeOperationRequest { private String benchmarkName; - private double[] percentiles; private boolean verbose; private int numExecutorNodes = 1; // How many nodes to run the benchmark on + private double[] percentiles = BenchmarkSettings.DEFAULT_PERCENTILES; // Global settings which can be overwritten at the competitor level private BenchmarkSettings settings = new BenchmarkSettings(); diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java index cbfc86742f3..b969fa013c2 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.bench; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -42,11 +43,12 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX private String benchmarkName; private State state = State.RUNNING; private boolean verbose; + private String[] errors = Strings.EMPTY_ARRAY; Map competitionResults; public BenchmarkResponse() { - competitionResults = new HashMap(); + competitionResults = new HashMap<>(); } public BenchmarkResponse(String benchmarkName, Map competitionResults) { @@ -59,11 +61,13 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX * RUNNING - executing normally * COMPLETE - completed normally * ABORTED - aborted + * FAILED - execution failed */ public static enum State { RUNNING((byte) 0), COMPLETE((byte) 1), - ABORTED((byte) 2); + ABORTED((byte) 2), + FAILED((byte) 3); private final byte id; private static final State[] STATES = new State[State.values().length]; @@ -91,50 +95,104 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX } } + /** + * Name of the benchmark + * @return Name of the benchmark + */ public String benchmarkName() { return benchmarkName; } + /** + * Sets the benchmark name + * @param benchmarkName Benchmark name + */ public void benchmarkName(String benchmarkName) { this.benchmarkName = benchmarkName; } + /** + * Benchmark state + * @return Benchmark state + */ public State state() { return state; } + /** + * Sets the state of the benchmark + * @param state State + */ public void state(State state) { this.state = state; } + /** + * Possibly replace the existing state with the new state depending on the severity + * of the new state. More severe states, such as FAILED, will over-write less severe + * ones, such as COMPLETED. + * @param newState New candidate state + * @return The merged state + */ + public State mergeState(State newState) { + if (state.compareTo(newState) < 0) { + state = newState; + } + return state; + } + + /** + * Map of competition names to competition results + * @return Map of competition names to competition results + */ public Map competitionResults() { return competitionResults; } - public boolean isAborted() { - return state == State.ABORTED; - } - - public boolean hasFailures() { - for (CompetitionResult result : competitionResults.values()) { - if (result.hasFailures()) { - return true; - } - } - return false; - } - + /** + * Whether to report verbose statistics + */ public boolean verbose() { return verbose; } + /** + * Sets whether to report verbose statistics + */ public void verbose(boolean verbose) { this.verbose = verbose; } + /** + * Whether the benchmark encountered error conditions + * @return Whether the benchmark encountered error conditions + */ + public boolean hasErrors() { + return (errors != null && errors.length > 0); + } + + /** + * Error messages + * @return Error messages + */ + public String[] errors() { + return this.errors; + } + + /** + * Sets error messages + * @param errors Error messages + */ + public void errors(String[] errors) { + this.errors = (errors == null) ? Strings.EMPTY_ARRAY : errors; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.STATUS, state.toString()); + if (errors != null) { + builder.array(Fields.ERRORS, errors); + } builder.startObject(Fields.COMPETITORS); if (competitionResults != null) { for (Map.Entry entry : competitionResults.entrySet()) { @@ -151,6 +209,7 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX super.readFrom(in); benchmarkName = in.readString(); state = State.fromId(in.readByte()); + errors = in.readStringArray(); int size = in.readVInt(); for (int i = 0; i < size; i++) { String s = in.readString(); @@ -165,6 +224,7 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX super.writeTo(out); out.writeString(benchmarkName); out.writeByte(state.id()); + out.writeStringArray(errors); out.write(competitionResults.size()); for (Map.Entry entry : competitionResults.entrySet()) { out.writeString(entry.getKey()); @@ -187,6 +247,7 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX static final class Fields { static final XContentBuilderString STATUS = new XContentBuilderString("status"); + static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); static final XContentBuilderString COMPETITORS = new XContentBuilderString("competitors"); } } diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java index 36af97831df..1decf54898c 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java @@ -142,7 +142,7 @@ public class BenchmarkService extends AbstractLifecycleComponent benchmarkResponses = new ArrayList<>(); - private final List errorMessages = new ArrayList<>(); public BenchmarkStatusResponse() { } - public BenchmarkStatusResponse(String nodeName) { - this.nodeName = nodeName; - } - public void addBenchResponse(BenchmarkResponse response) { benchmarkResponses.add(response); } @@ -53,30 +47,9 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl return benchmarkResponses; } - public String nodeName(String nodeName) { - this.nodeName = nodeName; - return nodeName; - } - - public String nodeName() { - return nodeName; - } - - public void addErrors(List errorMessages) { - this.errorMessages.addAll(errorMessages); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - if (errorMessages.size() > 0) { - builder.startArray("errors"); - for (String error : errorMessages) { - builder.field(error); - } - builder.endArray(); - } - if (benchmarkResponses.size() > 0) { builder.startObject("active_benchmarks"); for (BenchmarkResponse benchmarkResponse : benchmarkResponses) { @@ -93,24 +66,20 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - nodeName = in.readString(); int size = in.readVInt(); for (int i = 0; i < size; i++) { BenchmarkResponse br = new BenchmarkResponse(); br.readFrom(in); benchmarkResponses.add(br); } - errorMessages.addAll(Arrays.asList(in.readStringArray())); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(nodeName); out.writeVInt(benchmarkResponses.size()); for (BenchmarkResponse br : benchmarkResponses) { br.writeTo(out); } - out.writeStringArray(errorMessages.toArray(new String[errorMessages.size()])); } } diff --git a/src/main/java/org/elasticsearch/action/bench/CompetitionNodeResult.java b/src/main/java/org/elasticsearch/action/bench/CompetitionNodeResult.java index 47ecc44b914..36c01f58aac 100644 --- a/src/main/java/org/elasticsearch/action/bench/CompetitionNodeResult.java +++ b/src/main/java/org/elasticsearch/action/bench/CompetitionNodeResult.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.bench; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -40,11 +39,10 @@ public class CompetitionNodeResult extends ActionResponse implements Streamable private int completedIterations = 0; private int totalExecutedQueries = 0; private long warmUpTime = 0; - private String[] failures = Strings.EMPTY_ARRAY; private List iterations; public CompetitionNodeResult() { - iterations = new ArrayList(); + iterations = new ArrayList<>(); } public CompetitionNodeResult(String competitionName, String nodeName, int totalIterations, List iterations) { @@ -82,18 +80,6 @@ public class CompetitionNodeResult extends ActionResponse implements Streamable this.warmUpTime = warmUpTime; } - public String[] failures() { - return this.failures; - } - - public void failures(String[] failures) { - if (failures == null) { - this.failures = Strings.EMPTY_ARRAY; - } else { - this.failures = failures; - } - } - public int totalExecutedQueries() { return totalExecutedQueries; } @@ -115,7 +101,6 @@ public class CompetitionNodeResult extends ActionResponse implements Streamable completedIterations = in.readVInt(); totalExecutedQueries = in.readVInt(); warmUpTime = in.readVLong(); - failures = in.readStringArray(); int size = in.readVInt(); for (int i = 0; i < size; i++) { CompetitionIteration iteration = new CompetitionIteration(); @@ -133,7 +118,6 @@ public class CompetitionNodeResult extends ActionResponse implements Streamable out.writeVInt(completedIterations); out.writeVInt(totalExecutedQueries); out.writeVLong(warmUpTime); - out.writeStringArray(failures); out.writeVInt(iterations.size()); for (CompetitionIteration iteration : iterations) { iteration.writeTo(out); diff --git a/src/main/java/org/elasticsearch/action/bench/CompetitionResult.java b/src/main/java/org/elasticsearch/action/bench/CompetitionResult.java index b89c8e4e51f..bb0c1c70a98 100644 --- a/src/main/java/org/elasticsearch/action/bench/CompetitionResult.java +++ b/src/main/java/org/elasticsearch/action/bench/CompetitionResult.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; /** @@ -39,15 +38,13 @@ public class CompetitionResult implements Streamable, ToXContent { private int concurrency; private int multiplier; private boolean verbose; - private double[] percentiles = DEFAULT_PERCENTILES; + private double[] percentiles = BenchmarkSettings.DEFAULT_PERCENTILES; - private List nodeResults = new ArrayList(); + private List nodeResults = new ArrayList<>(); private CompetitionSummary competitionSummary; private CompetitionDetails competitionDetails; - private static final double[] DEFAULT_PERCENTILES = { 10, 25, 50, 75, 90, 99 }; - public CompetitionResult() { } /** @@ -74,7 +71,7 @@ public class CompetitionResult implements Streamable, ToXContent { this.concurrency = concurrency; this.multiplier = multiplier; this.verbose = verbose; - this.percentiles = (percentiles != null && percentiles.length > 0) ? percentiles : DEFAULT_PERCENTILES; + this.percentiles = (percentiles != null && percentiles.length > 0) ? percentiles : BenchmarkSettings.DEFAULT_PERCENTILES; this.competitionDetails = new CompetitionDetails(nodeResults); this.competitionSummary = new CompetitionSummary(nodeResults, concurrency, multiplier, percentiles); } @@ -121,7 +118,7 @@ public class CompetitionResult implements Streamable, ToXContent { /** * Gets the multiplier. The multiplier determines how many times each iteration will be run. - * @return Multipiler + * @return Multiplier */ public int multiplier() { return multiplier; @@ -159,36 +156,6 @@ public class CompetitionResult implements Streamable, ToXContent { this.percentiles = percentiles; } - /** - * Whether any failures were encountered - * @return True/false - */ - public boolean hasFailures() { - for (CompetitionNodeResult nodeResult : nodeResults) { - if (nodeResult.failures() != null && nodeResult.failures().length > 0) { - return true; - } - } - return false; - } - - /** - * Gets failure messages from node executions - * @return Node failure messages - */ - public List nodeFailures() { - List failures = null; - for (CompetitionNodeResult nodeResult : nodeResults) { - if (nodeResult.failures() != null && nodeResult.failures().length > 0) { - if (failures == null) { - failures = new ArrayList<>(); - } - failures.addAll(Arrays.asList(nodeResult.failures())); - } - } - return failures; - } - /** * Gets the results for each cluster node that the competition executed on. * @return Node-level results @@ -200,14 +167,6 @@ public class CompetitionResult implements Streamable, ToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(competitionName); - List nodeFailures = nodeFailures(); - if (nodeFailures != null) { - builder.startArray("failures"); - for (String failure : nodeFailures) { - builder.field(failure); - } - builder.endArray(); - } competitionSummary.toXContent(builder, params); if (verbose) { competitionDetails.toXContent(builder, params); @@ -229,6 +188,7 @@ public class CompetitionResult implements Streamable, ToXContent { nodeResults.add(result); } percentiles = in.readDoubleArray(); + competitionSummary = new CompetitionSummary(nodeResults, concurrency, multiplier, percentiles); } @Override diff --git a/src/main/java/org/elasticsearch/action/bench/CompetitionSummary.java b/src/main/java/org/elasticsearch/action/bench/CompetitionSummary.java index 2a959a16ae9..546d896fc7a 100644 --- a/src/main/java/org/elasticsearch/action/bench/CompetitionSummary.java +++ b/src/main/java/org/elasticsearch/action/bench/CompetitionSummary.java @@ -39,22 +39,22 @@ public class CompetitionSummary implements ToXContent { private List nodeResults; - private long min = 0; - private long max = 0; - private long totalTime = 0; - private long sumTotalHits = 0; - private long totalIterations = 0; - private long completedIterations = 0; - private long totalQueries = 0; - private double avgWarmupTime = 0; - private int concurrency = 0; - private int multiplier = 0; - private double mean = 0; - private double millisPerHit = 0.0; - private double stdDeviation = 0.0; - private double queriesPerSecond = 0.0; - private double[] percentiles; - private Map percentileValues = new TreeMap<>(); + long min = 0; + long max = 0; + long totalTime = 0; + long sumTotalHits = 0; + long totalIterations = 0; + long completedIterations = 0; + long totalQueries = 0; + double avgWarmupTime = 0; + int concurrency = 0; + int multiplier = 0; + double mean = 0; + double millisPerHit = 0.0; + double stdDeviation = 0.0; + double queriesPerSecond = 0.0; + double[] percentiles; + Map percentileValues = new TreeMap<>(); List> slowest = new ArrayList<>(); @@ -67,7 +67,11 @@ public class CompetitionSummary implements ToXContent { this.percentiles = percentiles; } - private void computeSummaryStatistics() { + public List nodeResults() { + return nodeResults; + } + + public void computeSummaryStatistics() { long totalWarmupTime = 0; SinglePassStatistics single = new SinglePassStatistics(); @@ -159,8 +163,8 @@ public class CompetitionSummary implements ToXContent { builder.endObject(); - builder.startArray(Fields.SLOWEST); if (totalIterations > 0 && slowest.size() > 0) { + builder.startArray(Fields.SLOWEST); int n = (int) (slowest.size() / totalIterations); for (int i = 0; i < n; i++) { builder.startObject(); @@ -168,8 +172,8 @@ public class CompetitionSummary implements ToXContent { slowest.get(i).v2().toXContent(builder, params); builder.endObject(); } + builder.endArray(); } - builder.endArray(); builder.endObject(); return builder; diff --git a/src/main/java/org/elasticsearch/client/Client.java b/src/main/java/org/elasticsearch/client/Client.java index 97faf1a5db5..3f4aa4c85ee 100644 --- a/src/main/java/org/elasticsearch/client/Client.java +++ b/src/main/java/org/elasticsearch/client/Client.java @@ -561,6 +561,11 @@ public interface Client { */ void bench(BenchmarkRequest request, ActionListener listener); + /** + * Runs a benchmark on the server + */ + ActionFuture bench(BenchmarkRequest request); + /** * Runs a benchmark on the server */ diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index b100c2d78a4..ecd7c3c8699 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -388,6 +388,11 @@ public abstract class AbstractClient implements InternalClient { execute(BenchmarkAction.INSTANCE, request, listener); } + @Override + public ActionFuture bench(BenchmarkRequest request) { + return execute(BenchmarkAction.INSTANCE, request); + } + @Override public BenchmarkRequestBuilder prepareBench(String... indices) { return new BenchmarkRequestBuilder(this, indices); diff --git a/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java b/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java index 1738d7cd545..b7b14b03c6c 100644 --- a/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java +++ b/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java @@ -48,7 +48,6 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestStatus.OK; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.METHOD_NOT_ALLOWED; -import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE; import static org.elasticsearch.common.xcontent.json.JsonXContent.contentBuilder; /** @@ -276,19 +275,23 @@ public class RestBenchAction extends BaseRestHandler { } } else if ("indices".equals(fieldName)) { List perCompetitorIndices = new ArrayList<>(); - List perCompetitorTypes = new ArrayList<>(); while ((token = p.nextToken()) != XContentParser.Token.END_ARRAY) { if (token == XContentParser.Token.VALUE_STRING) { - String[] parts = parseIndexNameAndType(p.text()); - perCompetitorIndices.add(parts[0]); - if (parts.length == 2) { - perCompetitorTypes.add(parts[1]); - } + perCompetitorIndices.add(p.text()); } else { - throw new ElasticsearchParseException("Failed parsing array field [" + fieldName + "] expected string values but got: " + token); + throw new ElasticsearchParseException("Failed parsing array field [" + fieldName + "] expected string values but got: " + token); } } builder.setIndices(perCompetitorIndices.toArray(new String[perCompetitorIndices.size()])); + } else if ("types".equals(fieldName)) { + List perCompetitorTypes = new ArrayList<>(); + while ((token = p.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_STRING) { + perCompetitorTypes.add(p.text()); + } else { + throw new ElasticsearchParseException("Failed parsing array field [" + fieldName + "] expected string values but got: " + token); + } + } builder.setTypes(perCompetitorTypes.toArray(new String[perCompetitorTypes.size()])); } else { throw new ElasticsearchParseException("Failed parsing array field [" + fieldName + "] field is not recognized"); @@ -389,11 +392,4 @@ public class RestBenchAction extends BaseRestHandler { } } } - - // We allow users to specify competitor index lists as "index_name/index_type" - private static String[] parseIndexNameAndType(String indexName) { - String[] parts = indexName.split("/", 2); - assert parts.length >= 1; - return parts; - } } diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java new file mode 100644 index 00000000000..74345412508 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.bench; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + +import org.apache.lucene.util.English; + +import org.junit.Before; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.*; + +/** + * Integration tests for benchmark API + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) +public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { + + private static final String BENCHMARK_NAME = "test_benchmark"; + private static final String COMPETITOR_PREFIX = "competitor_"; + private static final String INDEX_PREFIX = "test_index_"; + private static final String INDEX_TYPE = "test_type"; + + private static final int MIN_DOC_COUNT = 1; + private static final int MAX_DOC_COUNT = 1000; + + private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + private static final long TIMEOUT = 10; + + private int numExecutorNodes = 0; + private Map competitionSettingsMap; + private String[] indices = Strings.EMPTY_ARRAY; + + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder().put("node.bench", true).build(); + } + + @Before + public void beforeBenchmarkIntegrationTests() throws Exception { + numExecutorNodes = cluster().size(); + competitionSettingsMap = new HashMap<>(); + logger.info("--> indexing random data"); + indices = randomData(); + } + + @Test + public void testSubmitBenchmark() throws Exception { + + final BenchmarkRequest request = + BenchmarkTestUtil.randomRequest(client(),indices, numExecutorNodes, competitionSettingsMap); + logger.info("--> submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), + request.settings().iterations()); + final BenchmarkResponse response = client().bench(request).actionGet(); + + assertThat(response, notNullValue()); + assertThat(response.state(), equalTo(BenchmarkResponse.State.COMPLETE)); + assertFalse(response.hasErrors()); + assertThat(response.benchmarkName(), equalTo(BENCHMARK_NAME)); + assertThat(response.competitionResults().size(), equalTo(request.competitors().size())); + + for (CompetitionResult result : response.competitionResults().values()) { + assertThat(result.nodeResults().size(), equalTo(numExecutorNodes)); + validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), true); + } + } + + @Test + public void testListBenchmarks() throws Exception { + + final BenchmarkRequest request = + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap); + logger.info("--> submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), + request.settings().iterations()); + + final CountDownLatch countdown = new CountDownLatch(1); + final List throwables = new ArrayList<>(); + + client().bench(request, new ActionListener() { + @Override + public void onResponse(BenchmarkResponse benchmarkResponse) { + countdown.countDown(); + } + @Override + public void onFailure(Throwable e) { + throwables.add(e); + countdown.countDown(); + } + }); + + // Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not + // to finish so that we can 'capture' an in-progress state + Thread.sleep(1000); + + final BenchmarkStatusResponse response = client().prepareBenchStatus().execute().actionGet(); + assertThat(response.benchmarkResponses().size(), greaterThanOrEqualTo(0)); + + for (BenchmarkResponse benchmarkResponse : response.benchmarkResponses()) { + + assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); + assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING)); + assertFalse(benchmarkResponse.hasErrors()); + + for (CompetitionResult result : benchmarkResponse.competitionResults().values()) { + assertThat(result.nodeResults().size(), equalTo(numExecutorNodes)); + validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false); + } + } + + // Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster + if (!countdown.await(TIMEOUT, TIME_UNIT)) { + logger.warn("Timeout waiting to for benchmark to complete"); + } + + if (throwables.size() > 0) { + for (Throwable t : throwables) { + logger.error(t.getMessage(), t); + } + fail("Failed to execute benchmark"); + } + } + + @Test + public void testAbortBenchmark() throws Exception { + + final BenchmarkRequest request = + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap); + logger.info("--> submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), + request.settings().iterations()); + + final CountDownLatch countdown = new CountDownLatch(1); + final List throwables = new ArrayList<>(); + + client().bench(request, new ActionListener() { + @Override + public void onResponse(BenchmarkResponse benchmarkResponse) { + countdown.countDown(); + assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.ABORTED)); + } + @Override + public void onFailure(Throwable e) { + throwables.add(e); + countdown.countDown(); + } + }); + + // Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not + // to finish so that we can successfully execute an abort operation on it. + Thread.sleep(1000); + + final AbortBenchmarkResponse response = client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); + assertThat(response.getNodeResponses().size(), equalTo(numExecutorNodes)); + assertThat(response.getBenchmarkName(), equalTo(BENCHMARK_NAME)); + + for (AbortBenchmarkNodeResponse nodeResponse : response.getNodeResponses()) { + assertThat(nodeResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); + assertThat(nodeResponse.errorMessage(), nullValue()); + assertThat(nodeResponse.nodeName(), notNullValue()); + } + + // Send a list request to make sure there are no active benchmarks + final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); + assertThat(statusResponse.benchmarkResponses().size(), equalTo(0)); + + // Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster + if (!countdown.await(TIMEOUT, TIME_UNIT)) { + logger.warn("Timeout waiting to for benchmark to complete"); + } + + if (throwables.size() > 0) { + for (Throwable t : throwables) { + logger.error(t.getMessage(), t); + } + fail("Failed to execute benchmark"); + } + } + + @Test(expected = BenchmarkMissingException.class) + public void testAbortNoSuchBenchmark() throws Exception { + client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); + } + + private void validateCompetitionResult(CompetitionResult result, BenchmarkSettings requestedSettings, boolean strict) { + + // Validate settings + assertTrue(result.competitionName().startsWith(COMPETITOR_PREFIX)); + assertThat(result.concurrency(), equalTo(requestedSettings.concurrency())); + assertThat(result.multiplier(), equalTo(requestedSettings.multiplier())); + + // Validate node-level responses + for (CompetitionNodeResult nodeResult : result.nodeResults()) { + + assertThat(nodeResult.nodeName(), notNullValue()); + + assertThat(nodeResult.totalIterations(), equalTo(requestedSettings.iterations())); + if (strict) { + assertThat(nodeResult.completedIterations(), equalTo(requestedSettings.iterations())); + final int expectedQueryCount = requestedSettings.multiplier() * + nodeResult.totalIterations() * requestedSettings.searchRequests().size(); + assertThat(nodeResult.totalExecutedQueries(), equalTo(expectedQueryCount)); + assertThat(nodeResult.iterations().size(), equalTo(requestedSettings.iterations())); + } + + assertThat(nodeResult.warmUpTime(), greaterThanOrEqualTo(0L)); + + for (CompetitionIteration iteration : nodeResult.iterations()) { + // Basic sanity checks + iteration.computeStatistics(); + assertThat(iteration.totalTime(), greaterThanOrEqualTo(0L)); + assertThat(iteration.min(), greaterThanOrEqualTo(0L)); + assertThat(iteration.max(), greaterThanOrEqualTo(iteration.min())); + assertThat(iteration.mean(), greaterThanOrEqualTo((double) iteration.min())); + assertThat(iteration.mean(), lessThanOrEqualTo((double) iteration.max())); + assertThat(iteration.queriesPerSecond(), greaterThanOrEqualTo(0.0)); + assertThat(iteration.millisPerHit(), greaterThanOrEqualTo(0.0)); + validatePercentiles(iteration.percentileValues()); + } + } + + // Validate summary statistics + final CompetitionSummary summary = result.competitionSummary(); + summary.computeSummaryStatistics(); + assertThat(summary, notNullValue()); + assertThat(summary.min, greaterThanOrEqualTo(0L)); + assertThat(summary.max, greaterThanOrEqualTo(summary.min)); + assertThat(summary.mean, greaterThanOrEqualTo((double) summary.min)); + assertThat(summary.mean, lessThanOrEqualTo((double) summary.max)); + assertThat(summary.totalTime, greaterThanOrEqualTo(0L)); + assertThat(summary.queriesPerSecond, greaterThanOrEqualTo(0.0)); + assertThat(summary.millisPerHit, greaterThanOrEqualTo(0.0)); + assertThat(summary.avgWarmupTime, greaterThanOrEqualTo(0.0)); + if (strict) { + assertThat((int) summary.totalIterations, equalTo(requestedSettings.iterations() * summary.nodeResults().size())); + assertThat((int) summary.completedIterations, equalTo(requestedSettings.iterations() * summary.nodeResults().size())); + assertThat((int) summary.totalQueries, equalTo(requestedSettings.iterations() * requestedSettings.multiplier() * + requestedSettings.searchRequests().size() * summary.nodeResults().size())); + validatePercentiles(summary.percentileValues); + } + } + + private void validatePercentiles(Map percentiles) { + int i = 0; + Double last = null; + for (Map.Entry entry : percentiles.entrySet()) { + assertThat(entry.getKey(), equalTo(BenchmarkSettings.DEFAULT_PERCENTILES[i++])); + if (last != null) { + assertThat(entry.getValue(), greaterThanOrEqualTo(last)); + } + // This is a hedge against rounding errors. Sometimes two adjacent percentile values will + // be nearly equivalent except for some insignificant decimal places. In such cases we + // want the two values to compare as equal. + final BigDecimal bd = new BigDecimal(entry.getValue()).setScale(2, RoundingMode.HALF_DOWN); + last = bd.doubleValue(); + } + } + + private String[] randomData() throws Exception { + + final int numIndices = between(BenchmarkTestUtil.MIN_SMALL_INTERVAL, BenchmarkTestUtil.MAX_SMALL_INTERVAL); + final String[] indices = new String[numIndices]; + + for (int i = 0; i < numIndices; i++) { + indices[i] = INDEX_PREFIX + i; + final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT); + final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + + for (int j = 0; j < numDocs; j++) { + docs[j] = client().prepareIndex(indices[i], INDEX_TYPE). + setSource(BenchmarkTestUtil.TestIndexField.INT_FIELD.toString(), randomInt(), + BenchmarkTestUtil.TestIndexField.FLOAT_FIELD.toString(), randomFloat(), + BenchmarkTestUtil.TestIndexField.BOOLEAN_FIELD.toString(), randomBoolean(), + BenchmarkTestUtil.TestIndexField.STRING_FIELD.toString(), English.intToEnglish(j)); + } + + indexRandom(true, docs); + } + + return indices; + } +} diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkNegativeTest.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkNegativeTest.java new file mode 100644 index 00000000000..3a571938816 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkNegativeTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bench; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; + +import org.junit.Test; + +/** + * Tests for negative situations where we cannot run benchmarks + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) +public class BenchmarkNegativeTest extends ElasticsearchIntegrationTest { + + private static final String INDEX_NAME = "test_index"; + + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder().put("node.bench", false).build(); + } + + @Test(expected = BenchmarkNodeMissingException.class) + public void testSubmitBenchmarkNegative() { + client().bench(BenchmarkTestUtil.randomRequest( + client(), new String[] {INDEX_NAME}, cluster().size(), null)).actionGet(); + } + + @Test(expected = BenchmarkNodeMissingException.class) + public void testListBenchmarkNegative() { + client().prepareBenchStatus().execute().actionGet(); + } + + @Test(expected = BenchmarkNodeMissingException.class) + public void testAbortBenchmarkNegative() throws Exception { + client().prepareAbortBench(BenchmarkTestUtil.BENCHMARK_NAME).execute().actionGet(); + } +} diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java new file mode 100644 index 00000000000..a4b0f0c9194 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bench; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.util.Map; + +import static org.junit.Assert.fail; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.between; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomFrom; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomBoolean; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomAsciiOfLengthBetween; + +/** + * Utilities for building randomized benchmark tests. + */ +public class BenchmarkTestUtil { + + public static final int MIN_MULTIPLIER = 10; + public static final int MAX_MULTIPLIER = 500; + public static final int MIN_SMALL_INTERVAL = 1; + public static final int MAX_SMALL_INTERVAL = 3; + + public static final String BENCHMARK_NAME = "test_benchmark"; + public static final String COMPETITOR_PREFIX = "competitor_"; + public static final String INDEX_TYPE = "test_type"; + + public static final SearchType[] searchTypes = { SearchType.DFS_QUERY_THEN_FETCH, + SearchType.QUERY_THEN_FETCH, + SearchType.QUERY_AND_FETCH, + SearchType.DFS_QUERY_AND_FETCH, + SearchType.COUNT }; + + public static enum TestIndexField { + INT_FIELD("int_field"), + FLOAT_FIELD("float_field"), + BOOLEAN_FIELD("boolean_field"), + STRING_FIELD("string_field"); + + final String name; + + TestIndexField(String name) { + this.name = name; + } + + public String toString() { + return name; + } + } + + public static enum TestQueryType { + MATCH_ALL { + @Override + QueryBuilder getQuery() { + return QueryBuilders.matchAllQuery(); + } + }, + MATCH { + @Override + QueryBuilder getQuery() { + return QueryBuilders.matchQuery(TestIndexField.STRING_FIELD.toString(), + randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + } + }, + TERM { + @Override + QueryBuilder getQuery() { + return QueryBuilders.termQuery(TestIndexField.STRING_FIELD.toString(), + randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + } + }, + QUERY_STRING { + @Override + QueryBuilder getQuery() { + return QueryBuilders.queryString( + randomAsciiOfLengthBetween(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL)); + } + }, + WILDCARD { + @Override + QueryBuilder getQuery() { + return QueryBuilders.wildcardQuery( + TestIndexField.STRING_FIELD.toString(), randomBoolean() ? "*" : "?"); + } + }; + + abstract QueryBuilder getQuery(); + } + + public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes, + Map competitionSettingsMap) { + + final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices); + final BenchmarkSettings settings = randomSettings(); + + builder.setIterations(settings.iterations()); + builder.setConcurrency(settings.concurrency()); + builder.setMultiplier(settings.multiplier()); + builder.setSearchType(settings.searchType()); + builder.setWarmup(settings.warmup()); + builder.setNumExecutorNodes(numExecutorNodes); + + final int numCompetitors = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + for (int i = 0; i < numCompetitors; i++) { + builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices, competitionSettingsMap)); + } + + final BenchmarkRequest request = builder.request(); + request.benchmarkName(BENCHMARK_NAME); + request.cascadeGlobalSettings(); + request.applyLateBoundSettings(indices, new String[] { INDEX_TYPE }); + + return request; + } + + public static SearchRequest randomSearch(Client client, String[] indices) { + + final SearchRequestBuilder builder = new SearchRequestBuilder(client); + builder.setIndices(indices); + builder.setTypes(INDEX_TYPE); + builder.setQuery(randomFrom(TestQueryType.values()).getQuery()); + return builder.request(); + } + + public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices, + Map competitionSettingsMap) { + + final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder(); + final BenchmarkSettings settings = randomSettings(); + + builder.setClearCachesSettings(randomCacheSettings()); + builder.setIterations(settings.iterations()); + builder.setConcurrency(settings.concurrency()); + builder.setMultiplier(settings.multiplier()); + builder.setSearchType(settings.searchType()); + builder.setWarmup(settings.warmup()); + builder.setName(name); + + final int numSearches = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + for (int i = 0; i < numSearches; i++) { + final SearchRequest searchRequest = randomSearch(client, indices); + builder.addSearchRequest(searchRequest); + settings.addSearchRequest(searchRequest); + } + + if (competitionSettingsMap != null) { + competitionSettingsMap.put(name, settings); + } + + return builder.build(); + } + + public static BenchmarkSettings.ClearCachesSettings randomCacheSettings() { + + final BenchmarkSettings.ClearCachesSettings settings = new BenchmarkSettings.ClearCachesSettings(); + + settings.filterCache(randomBoolean()); + settings.fieldDataCache(randomBoolean()); + settings.idCache(randomBoolean()); + settings.recycler(randomBoolean()); + + if (randomBoolean()) { + final int numFieldsToClear = between(1, TestIndexField.values().length); + final String[] fields = new String[numFieldsToClear]; + for (int i = 0; i < numFieldsToClear; i++) { + fields[i] = TestIndexField.values()[i].toString(); + } + settings.fields(fields); + } + + return settings; + } + + public static BenchmarkSettings randomSettings() { + + final BenchmarkSettings settings = new BenchmarkSettings(); + + settings.concurrency(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true); + settings.iterations(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true); + settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true); + settings.warmup(randomBoolean(), true); + settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true); + + return settings; + } +} diff --git a/src/test/java/org/elasticsearch/test/client/RandomizingClient.java b/src/test/java/org/elasticsearch/test/client/RandomizingClient.java index 62bddfcfae6..8898f3b6609 100644 --- a/src/test/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/src/test/java/org/elasticsearch/test/client/RandomizingClient.java @@ -419,6 +419,11 @@ public class RandomizingClient implements InternalClient { delegate.bench(request, listener); } + @Override + public ActionFuture bench(BenchmarkRequest request) { + return delegate.bench(request); + } + @Override public BenchmarkRequestBuilder prepareBench(String... indices) { return delegate.prepareBench(indices);