[TEST] Fix for benchmark tests
- Fix bug where repeatedly calling computeSummaryStatistics() could accumulate some values incorrectly - Fix check for number of responsive nodes on list is <= number of candidate benchmark nodes - Add public getters for summary statistics - Add javadoc for new getters - Add javadoc comments about API use - Improve abort and status tests by calling awaitBusy() to wait for jobs to be completely submitted before testing them
This commit is contained in:
parent
5e40a4b95a
commit
48879752a2
|
@ -76,12 +76,12 @@ public class BenchmarkExecutor {
|
||||||
|
|
||||||
BenchmarkState state = activeBenchmarks.get(benchmarkName);
|
BenchmarkState state = activeBenchmarks.get(benchmarkName);
|
||||||
if (state == null) {
|
if (state == null) {
|
||||||
throw new ElasticsearchException("Benchmark [" + benchmarkName + "] not found");
|
throw new BenchmarkMissingException("Benchmark [" + benchmarkName + "] not found on [" + nodeName() + "]");
|
||||||
}
|
}
|
||||||
state.semaphore.stop();
|
state.semaphore.stop();
|
||||||
activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fRemove(benchmarkName).build();
|
activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fRemove(benchmarkName).build();
|
||||||
logger.debug("Aborted benchmark [{}]", benchmarkName);
|
logger.debug("Aborted benchmark [{}] on [{}]", benchmarkName, nodeName());
|
||||||
return new AbortBenchmarkNodeResponse(benchmarkName, nodeName);
|
return new AbortBenchmarkNodeResponse(benchmarkName, nodeName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -99,6 +99,8 @@ public class BenchmarkExecutor {
|
||||||
BenchmarkState state = activeBenchmarks.get(id);
|
BenchmarkState state = activeBenchmarks.get(id);
|
||||||
response.addBenchResponse(state.response);
|
response.addBenchResponse(state.response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug("Reporting [{}] active benchmarks on [{}]", response.activeBenchmarks(), nodeName());
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,13 +117,9 @@ public class BenchmarkExecutor {
|
||||||
final Map<String, CompetitionResult> competitionResults = new HashMap<String, CompetitionResult>();
|
final Map<String, CompetitionResult> competitionResults = new HashMap<String, CompetitionResult>();
|
||||||
final BenchmarkResponse benchmarkResponse = new BenchmarkResponse(request.benchmarkName(), competitionResults);
|
final BenchmarkResponse benchmarkResponse = new BenchmarkResponse(request.benchmarkName(), competitionResults);
|
||||||
|
|
||||||
if (this.nodeName == null) {
|
|
||||||
this.nodeName = clusterService.localNode().name();
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (activeBenchmarks.containsKey(request.benchmarkName())) {
|
if (activeBenchmarks.containsKey(request.benchmarkName())) {
|
||||||
throw new ElasticsearchException("Benchmark with id [" + request.benchmarkName() + "] is already running");
|
throw new ElasticsearchException("Benchmark [" + request.benchmarkName() + "] is already running on [" + nodeName() + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fPut(
|
activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fPut(
|
||||||
|
@ -133,13 +131,14 @@ public class BenchmarkExecutor {
|
||||||
|
|
||||||
final BenchmarkSettings settings = competitor.settings();
|
final BenchmarkSettings settings = competitor.settings();
|
||||||
final int iterations = settings.iterations();
|
final int iterations = settings.iterations();
|
||||||
logger.debug("Executing [{}] iterations for benchmark [{}][{}] ", iterations, request.benchmarkName(), competitor.name());
|
logger.debug("Executing [iterations: {}] [multiplier: {}] for [{}] on [{}]",
|
||||||
|
iterations, settings.multiplier(), request.benchmarkName(), nodeName());
|
||||||
|
|
||||||
final List<CompetitionIteration> competitionIterations = new ArrayList<>(iterations);
|
final List<CompetitionIteration> competitionIterations = new ArrayList<>(iterations);
|
||||||
final CompetitionResult competitionResult =
|
final CompetitionResult competitionResult =
|
||||||
new CompetitionResult(competitor.name(), settings.concurrency(), settings.multiplier(), request.percentiles());
|
new CompetitionResult(competitor.name(), settings.concurrency(), settings.multiplier(), request.percentiles());
|
||||||
final CompetitionNodeResult competitionNodeResult =
|
final CompetitionNodeResult competitionNodeResult =
|
||||||
new CompetitionNodeResult(competitor.name(), nodeName, iterations, competitionIterations);
|
new CompetitionNodeResult(competitor.name(), nodeName(), iterations, competitionIterations);
|
||||||
|
|
||||||
competitionResult.addCompetitionNodeResult(competitionNodeResult);
|
competitionResult.addCompetitionNodeResult(competitionNodeResult);
|
||||||
benchmarkResponse.competitionResults.put(competitor.name(), competitionResult);
|
benchmarkResponse.competitionResults.put(competitor.name(), competitionResult);
|
||||||
|
@ -244,9 +243,6 @@ public class BenchmarkExecutor {
|
||||||
throw new BenchmarkExecutionException("Too many execution failures", errorMessages);
|
throw new BenchmarkExecutionException("Too many execution failures", errorMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert assertBuckets(timeBuckets); // make sure they are all set
|
|
||||||
assert assertBuckets(docBuckets); // make sure they are all set
|
|
||||||
|
|
||||||
final long totalTime = TimeUnit.MILLISECONDS.convert(afterRun - beforeRun, TimeUnit.NANOSECONDS);
|
final long totalTime = TimeUnit.MILLISECONDS.convert(afterRun - beforeRun, TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
CompetitionIterationData iterationData = new CompetitionIterationData(timeBuckets);
|
CompetitionIterationData iterationData = new CompetitionIterationData(timeBuckets);
|
||||||
|
@ -260,7 +256,6 @@ public class BenchmarkExecutor {
|
||||||
|
|
||||||
CompetitionIteration round =
|
CompetitionIteration round =
|
||||||
new CompetitionIteration(topN, totalTime, timeBuckets.length, sumDocs, iterationData);
|
new CompetitionIteration(topN, totalTime, timeBuckets.length, sumDocs, iterationData);
|
||||||
|
|
||||||
return round;
|
return round;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,6 +414,13 @@ public class BenchmarkExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String nodeName() {
|
||||||
|
if (nodeName == null) {
|
||||||
|
nodeName = clusterService.localNode().name();
|
||||||
|
}
|
||||||
|
return nodeName;
|
||||||
|
}
|
||||||
|
|
||||||
private final boolean assertBuckets(long[] buckets) {
|
private final boolean assertBuckets(long[] buckets) {
|
||||||
for (int i = 0; i < buckets.length; i++) {
|
for (int i = 0; i < buckets.length; i++) {
|
||||||
assert buckets[i] >= 0 : "Bucket value was negative: " + buckets[i] + " bucket id: " + i;
|
assert buckets[i] >= 0 : "Bucket value was negative: " + buckets[i] + " bucket id: " + i;
|
||||||
|
|
|
@ -19,14 +19,30 @@
|
||||||
package org.elasticsearch.action.bench;
|
package org.elasticsearch.action.bench;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Benchmark module
|
* Benchmark module
|
||||||
*/
|
*/
|
||||||
public class BenchmarkModule extends AbstractModule {
|
public class BenchmarkModule extends AbstractModule {
|
||||||
|
|
||||||
|
private final Settings settings;
|
||||||
|
|
||||||
|
public static final String BENCHMARK_SERVICE_KEY = "benchmark.service.impl";
|
||||||
|
|
||||||
|
public BenchmarkModule(Settings settings) {
|
||||||
|
this.settings = settings;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
|
|
||||||
|
final Class<? extends BenchmarkService> service = settings.getAsClass(BENCHMARK_SERVICE_KEY, BenchmarkService.class);
|
||||||
|
|
||||||
|
if (!BenchmarkService.class.equals(service)) {
|
||||||
|
bind(BenchmarkService.class).to(service).asEagerSingleton();
|
||||||
|
} else {
|
||||||
bind(BenchmarkService.class).asEagerSingleton();
|
bind(BenchmarkService.class).asEagerSingleton();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final TransportService transportService;
|
private final TransportService transportService;
|
||||||
private final BenchmarkExecutor executor;
|
protected final BenchmarkExecutor executor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a service component for running benchmarks
|
* Constructs a service component for running benchmarks
|
||||||
|
@ -463,6 +463,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void sendResponse() {
|
protected void sendResponse() {
|
||||||
|
int activeBenchmarks = 0;
|
||||||
BenchmarkStatusResponse consolidatedResponse = new BenchmarkStatusResponse();
|
BenchmarkStatusResponse consolidatedResponse = new BenchmarkStatusResponse();
|
||||||
Map<String, List<BenchmarkResponse>> nameNodeResponseMap = new HashMap<>();
|
Map<String, List<BenchmarkResponse>> nameNodeResponseMap = new HashMap<>();
|
||||||
|
|
||||||
|
@ -476,6 +477,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
}
|
}
|
||||||
benchmarkResponses.add(benchmarkResponse);
|
benchmarkResponses.add(benchmarkResponse);
|
||||||
}
|
}
|
||||||
|
activeBenchmarks += nodeResponse.activeBenchmarks();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Map.Entry<String, List<BenchmarkResponse>> entry : nameNodeResponseMap.entrySet()) {
|
for (Map.Entry<String, List<BenchmarkResponse>> entry : nameNodeResponseMap.entrySet()) {
|
||||||
|
@ -483,6 +485,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
consolidatedResponse.addBenchResponse(consolidated);
|
consolidatedResponse.addBenchResponse(consolidated);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consolidatedResponse.totalActiveBenchmarks(activeBenchmarks);
|
||||||
listener.onResponse(consolidatedResponse);
|
listener.onResponse(consolidatedResponse);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ public class BenchmarkStatusNodeResponse extends ActionResponse implements Strea
|
||||||
private List<BenchmarkResponse> benchmarkResponses;
|
private List<BenchmarkResponse> benchmarkResponses;
|
||||||
|
|
||||||
public BenchmarkStatusNodeResponse() {
|
public BenchmarkStatusNodeResponse() {
|
||||||
benchmarkResponses = new ArrayList<BenchmarkResponse>();
|
benchmarkResponses = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void nodeName(String nodeName) {
|
public void nodeName(String nodeName) {
|
||||||
|
@ -59,6 +59,10 @@ public class BenchmarkStatusNodeResponse extends ActionResponse implements Strea
|
||||||
return benchmarkResponses;
|
return benchmarkResponses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int activeBenchmarks() {
|
||||||
|
return benchResponses().size();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.field("node", nodeName);
|
builder.field("node", nodeName);
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.ArrayList;
|
||||||
*/
|
*/
|
||||||
public class BenchmarkStatusResponse extends ActionResponse implements Streamable, ToXContent {
|
public class BenchmarkStatusResponse extends ActionResponse implements Streamable, ToXContent {
|
||||||
|
|
||||||
|
private int totalActiveBenchmarks = 0;
|
||||||
private final List<BenchmarkResponse> benchmarkResponses = new ArrayList<>();
|
private final List<BenchmarkResponse> benchmarkResponses = new ArrayList<>();
|
||||||
|
|
||||||
public BenchmarkStatusResponse() { }
|
public BenchmarkStatusResponse() { }
|
||||||
|
@ -47,6 +48,14 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl
|
||||||
return benchmarkResponses;
|
return benchmarkResponses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void totalActiveBenchmarks(int totalActiveBenchmarks) {
|
||||||
|
this.totalActiveBenchmarks = totalActiveBenchmarks;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int totalActiveBenchmarks() {
|
||||||
|
return totalActiveBenchmarks;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
|
||||||
|
@ -66,6 +75,7 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
|
totalActiveBenchmarks = in.readVInt();
|
||||||
int size = in.readVInt();
|
int size = in.readVInt();
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
BenchmarkResponse br = new BenchmarkResponse();
|
BenchmarkResponse br = new BenchmarkResponse();
|
||||||
|
@ -77,6 +87,7 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
|
out.writeVInt(totalActiveBenchmarks);
|
||||||
out.writeVInt(benchmarkResponses.size());
|
out.writeVInt(benchmarkResponses.size());
|
||||||
for (BenchmarkResponse br : benchmarkResponses) {
|
for (BenchmarkResponse br : benchmarkResponses) {
|
||||||
br.writeTo(out);
|
br.writeTo(out);
|
||||||
|
|
|
@ -184,7 +184,7 @@ public final class InternalNode implements Node {
|
||||||
modules.add(new ResourceWatcherModule());
|
modules.add(new ResourceWatcherModule());
|
||||||
modules.add(new RepositoriesModule());
|
modules.add(new RepositoriesModule());
|
||||||
modules.add(new TribeModule());
|
modules.add(new TribeModule());
|
||||||
modules.add(new BenchmarkModule());
|
modules.add(new BenchmarkModule(settings));
|
||||||
|
|
||||||
injector = modules.createInjector();
|
injector = modules.createInjector();
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.action.bench;
|
package org.elasticsearch.action.bench;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import com.google.common.base.Predicate;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionFuture;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
@ -33,11 +33,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
@ -57,7 +54,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
private static final int MAX_DOC_COUNT = 1000;
|
private static final int MAX_DOC_COUNT = 1000;
|
||||||
|
|
||||||
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
|
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
|
||||||
private static final long TIMEOUT = 10;
|
private static final long TIMEOUT = 20;
|
||||||
|
|
||||||
private int numExecutorNodes = 0;
|
private int numExecutorNodes = 0;
|
||||||
private Map<String, BenchmarkSettings> competitionSettingsMap;
|
private Map<String, BenchmarkSettings> competitionSettingsMap;
|
||||||
|
@ -67,6 +64,15 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
return ImmutableSettings.builder().put("node.bench", true).build();
|
return ImmutableSettings.builder().put("node.bench", true).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final Predicate<Object> statusPredicate = new Predicate<Object>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(Object input) {
|
||||||
|
final BenchmarkStatusResponse status = client().prepareBenchStatus().execute().actionGet();
|
||||||
|
// We expect to have one active benchmark on each node
|
||||||
|
return (status.totalActiveBenchmarks() == numExecutorNodes);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeBenchmarkIntegrationTests() throws Exception {
|
public void beforeBenchmarkIntegrationTests() throws Exception {
|
||||||
numExecutorNodes = cluster().size();
|
numExecutorNodes = cluster().size();
|
||||||
|
@ -76,7 +82,6 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/6094")
|
|
||||||
public void testSubmitBenchmark() throws Exception {
|
public void testSubmitBenchmark() throws Exception {
|
||||||
|
|
||||||
final BenchmarkRequest request =
|
final BenchmarkRequest request =
|
||||||
|
@ -101,33 +106,20 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
public void testListBenchmarks() throws Exception {
|
public void testListBenchmarks() throws Exception {
|
||||||
|
|
||||||
final BenchmarkRequest request =
|
final BenchmarkRequest request =
|
||||||
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap);
|
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap,
|
||||||
|
BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL);
|
||||||
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
|
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
|
||||||
request.settings().iterations());
|
request.settings().iterations());
|
||||||
|
|
||||||
final CountDownLatch countdown = new CountDownLatch(1);
|
client().bench(request);
|
||||||
final List<Throwable> throwables = new ArrayList<>();
|
|
||||||
|
|
||||||
client().bench(request, new ActionListener<BenchmarkResponse>() {
|
final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT);
|
||||||
@Override
|
assertTrue(ret);
|
||||||
public void onResponse(BenchmarkResponse benchmarkResponse) {
|
|
||||||
countdown.countDown();
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable e) {
|
|
||||||
throwables.add(e);
|
|
||||||
countdown.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not
|
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
|
||||||
// to finish so that we can 'capture' an in-progress state
|
assertThat(statusResponse.benchmarkResponses().size(), greaterThanOrEqualTo(0));
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
final BenchmarkStatusResponse response = client().prepareBenchStatus().execute().actionGet();
|
for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) {
|
||||||
assertThat(response.benchmarkResponses().size(), greaterThanOrEqualTo(0));
|
|
||||||
|
|
||||||
for (BenchmarkResponse benchmarkResponse : response.benchmarkResponses()) {
|
|
||||||
|
|
||||||
assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
|
assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
|
||||||
assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING));
|
assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING));
|
||||||
|
@ -138,74 +130,41 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false);
|
validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster
|
|
||||||
if (!countdown.await(TIMEOUT, TIME_UNIT)) {
|
|
||||||
logger.warn("Timeout waiting to for benchmark to complete");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (throwables.size() > 0) {
|
|
||||||
for (Throwable t : throwables) {
|
|
||||||
logger.error(t.getMessage(), t);
|
|
||||||
}
|
|
||||||
fail("Failed to execute benchmark");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/6094")
|
|
||||||
public void testAbortBenchmark() throws Exception {
|
public void testAbortBenchmark() throws Exception {
|
||||||
|
|
||||||
final BenchmarkRequest request =
|
final BenchmarkRequest request =
|
||||||
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap);
|
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap,
|
||||||
|
BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL);
|
||||||
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
|
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
|
||||||
request.settings().iterations());
|
request.settings().iterations());
|
||||||
|
|
||||||
final CountDownLatch countdown = new CountDownLatch(1);
|
final ActionFuture<BenchmarkResponse> benchmarkResponse = client().bench(request);
|
||||||
final List<Throwable> throwables = new ArrayList<>();
|
|
||||||
|
|
||||||
client().bench(request, new ActionListener<BenchmarkResponse>() {
|
final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT);
|
||||||
@Override
|
assertTrue(ret);
|
||||||
public void onResponse(BenchmarkResponse benchmarkResponse) {
|
|
||||||
countdown.countDown();
|
|
||||||
assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.ABORTED));
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable e) {
|
|
||||||
throwables.add(e);
|
|
||||||
countdown.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not
|
final AbortBenchmarkResponse abortResponse =
|
||||||
// to finish so that we can successfully execute an abort operation on it.
|
client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet();
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
final AbortBenchmarkResponse response = client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet();
|
// Confirm that the benchmark was actually aborted and did not finish on its own
|
||||||
assertThat(response.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes));
|
assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes));
|
||||||
assertThat(response.getBenchmarkName(), equalTo(BENCHMARK_NAME));
|
assertThat(abortResponse.getBenchmarkName(), equalTo(BENCHMARK_NAME));
|
||||||
|
|
||||||
for (AbortBenchmarkNodeResponse nodeResponse : response.getNodeResponses()) {
|
for (AbortBenchmarkNodeResponse nodeResponse : abortResponse.getNodeResponses()) {
|
||||||
assertThat(nodeResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
|
assertThat(nodeResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
|
||||||
assertThat(nodeResponse.errorMessage(), nullValue());
|
assertThat(nodeResponse.errorMessage(), nullValue());
|
||||||
assertThat(nodeResponse.nodeName(), notNullValue());
|
assertThat(nodeResponse.nodeName(), notNullValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a list request to make sure there are no active benchmarks
|
// Confirm that there are no active benchmarks in the cluster
|
||||||
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
|
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
|
||||||
assertThat(statusResponse.benchmarkResponses().size(), equalTo(0));
|
assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0));
|
||||||
|
|
||||||
// Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster
|
// Confirm that benchmark was indeed aborted
|
||||||
if (!countdown.await(TIMEOUT, TIME_UNIT)) {
|
assertThat(benchmarkResponse.get().state(), equalTo(BenchmarkResponse.State.ABORTED));
|
||||||
logger.warn("Timeout waiting to for benchmark to complete");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (throwables.size() > 0) {
|
|
||||||
for (Throwable t : throwables) {
|
|
||||||
logger.error(t.getMessage(), t);
|
|
||||||
}
|
|
||||||
fail("Failed to execute benchmark");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = BenchmarkMissingException.class)
|
@Test(expected = BenchmarkMissingException.class)
|
||||||
|
@ -308,6 +267,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
|
||||||
indexRandom(true, docs);
|
indexRandom(true, docs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flushAndRefresh();
|
||||||
return indices;
|
return indices;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.between;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.between;
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomFrom;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomFrom;
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomBoolean;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomBoolean;
|
||||||
|
@ -43,6 +42,8 @@ public class BenchmarkTestUtil {
|
||||||
public static final int MAX_MULTIPLIER = 500;
|
public static final int MAX_MULTIPLIER = 500;
|
||||||
public static final int MIN_SMALL_INTERVAL = 1;
|
public static final int MIN_SMALL_INTERVAL = 1;
|
||||||
public static final int MAX_SMALL_INTERVAL = 3;
|
public static final int MAX_SMALL_INTERVAL = 3;
|
||||||
|
public static final int MIN_LARGE_INTERVAL = 11;
|
||||||
|
public static final int MAX_LARGE_INTERVAL = 19;
|
||||||
|
|
||||||
public static final String BENCHMARK_NAME = "test_benchmark";
|
public static final String BENCHMARK_NAME = "test_benchmark";
|
||||||
public static final String COMPETITOR_PREFIX = "competitor_";
|
public static final String COMPETITOR_PREFIX = "competitor_";
|
||||||
|
@ -111,10 +112,11 @@ public class BenchmarkTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes,
|
public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes,
|
||||||
Map<String, BenchmarkSettings> competitionSettingsMap) {
|
Map<String, BenchmarkSettings> competitionSettingsMap,
|
||||||
|
int lowRandomIntervalBound, int highRandomIntervalBound) {
|
||||||
|
|
||||||
final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices);
|
final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices);
|
||||||
final BenchmarkSettings settings = randomSettings();
|
final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound);
|
||||||
|
|
||||||
builder.setIterations(settings.iterations());
|
builder.setIterations(settings.iterations());
|
||||||
builder.setConcurrency(settings.concurrency());
|
builder.setConcurrency(settings.concurrency());
|
||||||
|
@ -123,9 +125,10 @@ public class BenchmarkTestUtil {
|
||||||
builder.setWarmup(settings.warmup());
|
builder.setWarmup(settings.warmup());
|
||||||
builder.setNumExecutorNodes(numExecutorNodes);
|
builder.setNumExecutorNodes(numExecutorNodes);
|
||||||
|
|
||||||
final int numCompetitors = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL);
|
final int numCompetitors = between(lowRandomIntervalBound, highRandomIntervalBound);
|
||||||
for (int i = 0; i < numCompetitors; i++) {
|
for (int i = 0; i < numCompetitors; i++) {
|
||||||
builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices, competitionSettingsMap));
|
builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices,
|
||||||
|
competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound));
|
||||||
}
|
}
|
||||||
|
|
||||||
final BenchmarkRequest request = builder.request();
|
final BenchmarkRequest request = builder.request();
|
||||||
|
@ -136,6 +139,13 @@ public class BenchmarkTestUtil {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes,
|
||||||
|
Map<String, BenchmarkSettings> competitionSettingsMap) {
|
||||||
|
|
||||||
|
return randomRequest(client, indices, numExecutorNodes,
|
||||||
|
competitionSettingsMap, MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL);
|
||||||
|
}
|
||||||
|
|
||||||
public static SearchRequest randomSearch(Client client, String[] indices) {
|
public static SearchRequest randomSearch(Client client, String[] indices) {
|
||||||
|
|
||||||
final SearchRequestBuilder builder = new SearchRequestBuilder(client);
|
final SearchRequestBuilder builder = new SearchRequestBuilder(client);
|
||||||
|
@ -146,10 +156,11 @@ public class BenchmarkTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices,
|
public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices,
|
||||||
Map<String, BenchmarkSettings> competitionSettingsMap) {
|
Map<String, BenchmarkSettings> competitionSettingsMap,
|
||||||
|
int lowRandomIntervalBound, int highRandomIntervalBound) {
|
||||||
|
|
||||||
final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder();
|
final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder();
|
||||||
final BenchmarkSettings settings = randomSettings();
|
final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound);
|
||||||
|
|
||||||
builder.setClearCachesSettings(randomCacheSettings());
|
builder.setClearCachesSettings(randomCacheSettings());
|
||||||
builder.setIterations(settings.iterations());
|
builder.setIterations(settings.iterations());
|
||||||
|
@ -159,7 +170,7 @@ public class BenchmarkTestUtil {
|
||||||
builder.setWarmup(settings.warmup());
|
builder.setWarmup(settings.warmup());
|
||||||
builder.setName(name);
|
builder.setName(name);
|
||||||
|
|
||||||
final int numSearches = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL);
|
final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound);
|
||||||
for (int i = 0; i < numSearches; i++) {
|
for (int i = 0; i < numSearches; i++) {
|
||||||
final SearchRequest searchRequest = randomSearch(client, indices);
|
final SearchRequest searchRequest = randomSearch(client, indices);
|
||||||
builder.addSearchRequest(searchRequest);
|
builder.addSearchRequest(searchRequest);
|
||||||
|
@ -194,12 +205,12 @@ public class BenchmarkTestUtil {
|
||||||
return settings;
|
return settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BenchmarkSettings randomSettings() {
|
public static BenchmarkSettings randomSettings(int lowRandomIntervalBound, int highRandomIntervalBound) {
|
||||||
|
|
||||||
final BenchmarkSettings settings = new BenchmarkSettings();
|
final BenchmarkSettings settings = new BenchmarkSettings();
|
||||||
|
|
||||||
settings.concurrency(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true);
|
settings.concurrency(between(lowRandomIntervalBound, highRandomIntervalBound), true);
|
||||||
settings.iterations(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true);
|
settings.iterations(between(lowRandomIntervalBound, highRandomIntervalBound), true);
|
||||||
settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true);
|
settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true);
|
||||||
settings.warmup(randomBoolean(), true);
|
settings.warmup(randomBoolean(), true);
|
||||||
settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true);
|
settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true);
|
||||||
|
|
Loading…
Reference in New Issue