diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java index 24ecbd1b3ec..fe56650d814 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkExecutor.java @@ -76,12 +76,12 @@ public class BenchmarkExecutor { BenchmarkState state = activeBenchmarks.get(benchmarkName); if (state == null) { - throw new ElasticsearchException("Benchmark [" + benchmarkName + "] not found"); + throw new BenchmarkMissingException("Benchmark [" + benchmarkName + "] not found on [" + nodeName() + "]"); } state.semaphore.stop(); activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fRemove(benchmarkName).build(); - logger.debug("Aborted benchmark [{}]", benchmarkName); - return new AbortBenchmarkNodeResponse(benchmarkName, nodeName); + logger.debug("Aborted benchmark [{}] on [{}]", benchmarkName, nodeName()); + return new AbortBenchmarkNodeResponse(benchmarkName, nodeName()); } /** @@ -99,6 +99,8 @@ public class BenchmarkExecutor { BenchmarkState state = activeBenchmarks.get(id); response.addBenchResponse(state.response); } + + logger.debug("Reporting [{}] active benchmarks on [{}]", response.activeBenchmarks(), nodeName()); return response; } @@ -115,13 +117,9 @@ public class BenchmarkExecutor { final Map competitionResults = new HashMap(); final BenchmarkResponse benchmarkResponse = new BenchmarkResponse(request.benchmarkName(), competitionResults); - if (this.nodeName == null) { - this.nodeName = clusterService.localNode().name(); - } - synchronized (lock) { if (activeBenchmarks.containsKey(request.benchmarkName())) { - throw new ElasticsearchException("Benchmark with id [" + request.benchmarkName() + "] is already running"); + throw new ElasticsearchException("Benchmark [" + request.benchmarkName() + "] is already running on [" + nodeName() + "]"); } activeBenchmarks = ImmutableOpenMap.builder(activeBenchmarks).fPut( @@ -133,13 +131,14 @@ public class BenchmarkExecutor { final BenchmarkSettings settings = competitor.settings(); final int iterations = settings.iterations(); - logger.debug("Executing [{}] iterations for benchmark [{}][{}] ", iterations, request.benchmarkName(), competitor.name()); + logger.debug("Executing [iterations: {}] [multiplier: {}] for [{}] on [{}]", + iterations, settings.multiplier(), request.benchmarkName(), nodeName()); final List competitionIterations = new ArrayList<>(iterations); final CompetitionResult competitionResult = new CompetitionResult(competitor.name(), settings.concurrency(), settings.multiplier(), request.percentiles()); final CompetitionNodeResult competitionNodeResult = - new CompetitionNodeResult(competitor.name(), nodeName, iterations, competitionIterations); + new CompetitionNodeResult(competitor.name(), nodeName(), iterations, competitionIterations); competitionResult.addCompetitionNodeResult(competitionNodeResult); benchmarkResponse.competitionResults.put(competitor.name(), competitionResult); @@ -244,9 +243,6 @@ public class BenchmarkExecutor { throw new BenchmarkExecutionException("Too many execution failures", errorMessages); } - assert assertBuckets(timeBuckets); // make sure they are all set - assert assertBuckets(docBuckets); // make sure they are all set - final long totalTime = TimeUnit.MILLISECONDS.convert(afterRun - beforeRun, TimeUnit.NANOSECONDS); CompetitionIterationData iterationData = new CompetitionIterationData(timeBuckets); @@ -260,7 +256,6 @@ public class BenchmarkExecutor { CompetitionIteration round = new CompetitionIteration(topN, totalTime, timeBuckets.length, sumDocs, iterationData); - return round; } @@ -419,6 +414,13 @@ public class BenchmarkExecutor { } } + private String nodeName() { + if (nodeName == null) { + nodeName = clusterService.localNode().name(); + } + return nodeName; + } + private final boolean assertBuckets(long[] buckets) { for (int i = 0; i < buckets.length; i++) { assert buckets[i] >= 0 : "Bucket value was negative: " + buckets[i] + " bucket id: " + i; diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkModule.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkModule.java index f48783a3400..667c8e2116c 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkModule.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkModule.java @@ -19,14 +19,30 @@ package org.elasticsearch.action.bench; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.settings.Settings; /** * Benchmark module */ public class BenchmarkModule extends AbstractModule { + private final Settings settings; + + public static final String BENCHMARK_SERVICE_KEY = "benchmark.service.impl"; + + public BenchmarkModule(Settings settings) { + this.settings = settings; + } + @Override protected void configure() { - bind(BenchmarkService.class).asEagerSingleton(); + + final Class service = settings.getAsClass(BENCHMARK_SERVICE_KEY, BenchmarkService.class); + + if (!BenchmarkService.class.equals(service)) { + bind(BenchmarkService.class).to(service).asEagerSingleton(); + } else { + bind(BenchmarkService.class).asEagerSingleton(); + } } } diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java index 1decf54898c..0ce7de26554 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java @@ -56,7 +56,7 @@ public class BenchmarkService extends AbstractLifecycleComponent> nameNodeResponseMap = new HashMap<>(); @@ -476,6 +477,7 @@ public class BenchmarkService extends AbstractLifecycleComponent> entry : nameNodeResponseMap.entrySet()) { @@ -483,6 +485,7 @@ public class BenchmarkService extends AbstractLifecycleComponent benchmarkResponses; public BenchmarkStatusNodeResponse() { - benchmarkResponses = new ArrayList(); + benchmarkResponses = new ArrayList<>(); } public void nodeName(String nodeName) { @@ -59,6 +59,10 @@ public class BenchmarkStatusNodeResponse extends ActionResponse implements Strea return benchmarkResponses; } + public int activeBenchmarks() { + return benchResponses().size(); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("node", nodeName); diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkStatusResponse.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkStatusResponse.java index bfb60134a1d..eeb19c0d79c 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkStatusResponse.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkStatusResponse.java @@ -35,6 +35,7 @@ import java.util.ArrayList; */ public class BenchmarkStatusResponse extends ActionResponse implements Streamable, ToXContent { + private int totalActiveBenchmarks = 0; private final List benchmarkResponses = new ArrayList<>(); public BenchmarkStatusResponse() { } @@ -47,6 +48,14 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl return benchmarkResponses; } + public void totalActiveBenchmarks(int totalActiveBenchmarks) { + this.totalActiveBenchmarks = totalActiveBenchmarks; + } + + public int totalActiveBenchmarks() { + return totalActiveBenchmarks; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -66,6 +75,7 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + totalActiveBenchmarks = in.readVInt(); int size = in.readVInt(); for (int i = 0; i < size; i++) { BenchmarkResponse br = new BenchmarkResponse(); @@ -77,6 +87,7 @@ public class BenchmarkStatusResponse extends ActionResponse implements Streamabl @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeVInt(totalActiveBenchmarks); out.writeVInt(benchmarkResponses.size()); for (BenchmarkResponse br : benchmarkResponses) { br.writeTo(out); diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 5e501407ed2..1d5a6c1b988 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -184,7 +184,7 @@ public final class InternalNode implements Node { modules.add(new ResourceWatcherModule()); modules.add(new RepositoriesModule()); modules.add(new TribeModule()); - modules.add(new BenchmarkModule()); + modules.add(new BenchmarkModule(settings)); injector = modules.createInjector(); diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java index 6e46af71a3b..94003da1e9c 100644 --- a/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkIntegrationTest.java @@ -18,8 +18,8 @@ */ package org.elasticsearch.action.bench; -import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.action.ActionListener; +import com.google.common.base.Predicate; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.Strings; import org.elasticsearch.test.ElasticsearchIntegrationTest; @@ -33,11 +33,8 @@ import org.junit.Test; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.*; @@ -57,7 +54,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { private static final int MAX_DOC_COUNT = 1000; private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - private static final long TIMEOUT = 10; + private static final long TIMEOUT = 20; private int numExecutorNodes = 0; private Map competitionSettingsMap; @@ -67,6 +64,15 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { return ImmutableSettings.builder().put("node.bench", true).build(); } + private final Predicate statusPredicate = new Predicate() { + @Override + public boolean apply(Object input) { + final BenchmarkStatusResponse status = client().prepareBenchStatus().execute().actionGet(); + // We expect to have one active benchmark on each node + return (status.totalActiveBenchmarks() == numExecutorNodes); + } + }; + @Before public void beforeBenchmarkIntegrationTests() throws Exception { numExecutorNodes = cluster().size(); @@ -76,7 +82,6 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { } @Test - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/6094") public void testSubmitBenchmark() throws Exception { final BenchmarkRequest request = @@ -101,33 +106,20 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { public void testListBenchmarks() throws Exception { final BenchmarkRequest request = - BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap); + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, + BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL); logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), request.settings().iterations()); - final CountDownLatch countdown = new CountDownLatch(1); - final List throwables = new ArrayList<>(); + client().bench(request); - client().bench(request, new ActionListener() { - @Override - public void onResponse(BenchmarkResponse benchmarkResponse) { - countdown.countDown(); - } - @Override - public void onFailure(Throwable e) { - throwables.add(e); - countdown.countDown(); - } - }); + final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT); + assertTrue(ret); - // Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not - // to finish so that we can 'capture' an in-progress state - Thread.sleep(1000); + final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); + assertThat(statusResponse.benchmarkResponses().size(), greaterThanOrEqualTo(0)); - final BenchmarkStatusResponse response = client().prepareBenchStatus().execute().actionGet(); - assertThat(response.benchmarkResponses().size(), greaterThanOrEqualTo(0)); - - for (BenchmarkResponse benchmarkResponse : response.benchmarkResponses()) { + for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) { assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.RUNNING)); @@ -138,74 +130,41 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false); } } - - // Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster - if (!countdown.await(TIMEOUT, TIME_UNIT)) { - logger.warn("Timeout waiting to for benchmark to complete"); - } - - if (throwables.size() > 0) { - for (Throwable t : throwables) { - logger.error(t.getMessage(), t); - } - fail("Failed to execute benchmark"); - } } @Test - @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/6094") public void testAbortBenchmark() throws Exception { final BenchmarkRequest request = - BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap); + BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, + BenchmarkTestUtil.MIN_LARGE_INTERVAL, BenchmarkTestUtil.MAX_LARGE_INTERVAL); logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(), request.settings().iterations()); - final CountDownLatch countdown = new CountDownLatch(1); - final List throwables = new ArrayList<>(); + final ActionFuture benchmarkResponse = client().bench(request); - client().bench(request, new ActionListener() { - @Override - public void onResponse(BenchmarkResponse benchmarkResponse) { - countdown.countDown(); - assertThat(benchmarkResponse.state(), equalTo(BenchmarkResponse.State.ABORTED)); - } - @Override - public void onFailure(Throwable e) { - throwables.add(e); - countdown.countDown(); - } - }); + final boolean ret = awaitBusy(statusPredicate, TIMEOUT, TIME_UNIT); + assertTrue(ret); - // Attempt to 'yield' to the thread executing the benchmark; we want it to start executing, but not - // to finish so that we can successfully execute an abort operation on it. - Thread.sleep(1000); + final AbortBenchmarkResponse abortResponse = + client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); - final AbortBenchmarkResponse response = client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); - assertThat(response.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes)); - assertThat(response.getBenchmarkName(), equalTo(BENCHMARK_NAME)); + // Confirm that the benchmark was actually aborted and did not finish on its own + assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes)); + assertThat(abortResponse.getBenchmarkName(), equalTo(BENCHMARK_NAME)); - for (AbortBenchmarkNodeResponse nodeResponse : response.getNodeResponses()) { + for (AbortBenchmarkNodeResponse nodeResponse : abortResponse.getNodeResponses()) { assertThat(nodeResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); assertThat(nodeResponse.errorMessage(), nullValue()); assertThat(nodeResponse.nodeName(), notNullValue()); } - // Send a list request to make sure there are no active benchmarks + // Confirm that there are no active benchmarks in the cluster final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); - assertThat(statusResponse.benchmarkResponses().size(), equalTo(0)); + assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0)); - // Wait for active benchmark to finish; not fatal if we timeout as the framework will tear down the cluster - if (!countdown.await(TIMEOUT, TIME_UNIT)) { - logger.warn("Timeout waiting to for benchmark to complete"); - } - - if (throwables.size() > 0) { - for (Throwable t : throwables) { - logger.error(t.getMessage(), t); - } - fail("Failed to execute benchmark"); - } + // Confirm that benchmark was indeed aborted + assertThat(benchmarkResponse.get().state(), equalTo(BenchmarkResponse.State.ABORTED)); } @Test(expected = BenchmarkMissingException.class) @@ -308,6 +267,7 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest { indexRandom(true, docs); } + flushAndRefresh(); return indices; } } diff --git a/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java index a4b0f0c9194..ed77d6fbf76 100644 --- a/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java +++ b/src/test/java/org/elasticsearch/action/bench/BenchmarkTestUtil.java @@ -28,7 +28,6 @@ import org.elasticsearch.index.query.QueryBuilders; import java.util.Map; -import static org.junit.Assert.fail; import static org.elasticsearch.test.ElasticsearchIntegrationTest.between; import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomFrom; import static org.elasticsearch.test.ElasticsearchIntegrationTest.randomBoolean; @@ -43,6 +42,8 @@ public class BenchmarkTestUtil { public static final int MAX_MULTIPLIER = 500; public static final int MIN_SMALL_INTERVAL = 1; public static final int MAX_SMALL_INTERVAL = 3; + public static final int MIN_LARGE_INTERVAL = 11; + public static final int MAX_LARGE_INTERVAL = 19; public static final String BENCHMARK_NAME = "test_benchmark"; public static final String COMPETITOR_PREFIX = "competitor_"; @@ -111,10 +112,11 @@ public class BenchmarkTestUtil { } public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes, - Map competitionSettingsMap) { + Map competitionSettingsMap, + int lowRandomIntervalBound, int highRandomIntervalBound) { final BenchmarkRequestBuilder builder = new BenchmarkRequestBuilder(client, indices); - final BenchmarkSettings settings = randomSettings(); + final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound); builder.setIterations(settings.iterations()); builder.setConcurrency(settings.concurrency()); @@ -123,9 +125,10 @@ public class BenchmarkTestUtil { builder.setWarmup(settings.warmup()); builder.setNumExecutorNodes(numExecutorNodes); - final int numCompetitors = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + final int numCompetitors = between(lowRandomIntervalBound, highRandomIntervalBound); for (int i = 0; i < numCompetitors; i++) { - builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices, competitionSettingsMap)); + builder.addCompetitor(randomCompetitor(client, COMPETITOR_PREFIX + i, indices, + competitionSettingsMap, lowRandomIntervalBound, highRandomIntervalBound)); } final BenchmarkRequest request = builder.request(); @@ -136,6 +139,13 @@ public class BenchmarkTestUtil { return request; } + public static BenchmarkRequest randomRequest(Client client, String[] indices, int numExecutorNodes, + Map competitionSettingsMap) { + + return randomRequest(client, indices, numExecutorNodes, + competitionSettingsMap, MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + } + public static SearchRequest randomSearch(Client client, String[] indices) { final SearchRequestBuilder builder = new SearchRequestBuilder(client); @@ -146,10 +156,11 @@ public class BenchmarkTestUtil { } public static BenchmarkCompetitor randomCompetitor(Client client, String name, String[] indices, - Map competitionSettingsMap) { + Map competitionSettingsMap, + int lowRandomIntervalBound, int highRandomIntervalBound) { final BenchmarkCompetitorBuilder builder = new BenchmarkCompetitorBuilder(); - final BenchmarkSettings settings = randomSettings(); + final BenchmarkSettings settings = randomSettings(lowRandomIntervalBound, highRandomIntervalBound); builder.setClearCachesSettings(randomCacheSettings()); builder.setIterations(settings.iterations()); @@ -159,7 +170,7 @@ public class BenchmarkTestUtil { builder.setWarmup(settings.warmup()); builder.setName(name); - final int numSearches = between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL); + final int numSearches = between(lowRandomIntervalBound, highRandomIntervalBound); for (int i = 0; i < numSearches; i++) { final SearchRequest searchRequest = randomSearch(client, indices); builder.addSearchRequest(searchRequest); @@ -194,12 +205,12 @@ public class BenchmarkTestUtil { return settings; } - public static BenchmarkSettings randomSettings() { + public static BenchmarkSettings randomSettings(int lowRandomIntervalBound, int highRandomIntervalBound) { final BenchmarkSettings settings = new BenchmarkSettings(); - settings.concurrency(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true); - settings.iterations(between(MIN_SMALL_INTERVAL, MAX_SMALL_INTERVAL), true); + settings.concurrency(between(lowRandomIntervalBound, highRandomIntervalBound), true); + settings.iterations(between(lowRandomIntervalBound, highRandomIntervalBound), true); settings.multiplier(between(MIN_MULTIPLIER, MAX_MULTIPLIER), true); settings.warmup(randomBoolean(), true); settings.searchType(searchTypes[between(0, searchTypes.length - 1)], true);