[TEST] Ensure all benchmarks are aborted on failure and latches are counted down

This commit is contained in:
Simon Willnauer 2014-05-14 16:40:34 +02:00
parent 8f0991c14f
commit 2c1c5c163f
2 changed files with 62 additions and 49 deletions

View File

@ -28,12 +28,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionBuilder; import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.searchRequest; import static org.elasticsearch.client.Requests.searchRequest;
import static org.elasticsearch.index.query.QueryBuilders.functionScoreQuery; import static org.elasticsearch.index.query.QueryBuilders.functionScoreQuery;
@ -74,6 +74,12 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
} }
} }
@After
public void afterBenchmarkIntegrationTests() throws Exception {
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
assertThat("Some benchmarks are still running", statusResponse.benchmarkResponses(), is(empty()));
}
@Before @Before
public void beforeBenchmarkIntegrationTests() throws Exception { public void beforeBenchmarkIntegrationTests() throws Exception {
waitForTestLatch = null; waitForTestLatch = null;
@ -119,11 +125,10 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
request.settings().iterations()); request.settings().iterations());
final ActionFuture<BenchmarkResponse> future = client().bench(request); final ActionFuture<BenchmarkResponse> future = client().bench(request);
try {
assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS)); waitForQuery.await();
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
waitForTestLatch.countDown(); waitForTestLatch.countDown();
assertThat(statusResponse.benchmarkResponses().size(), greaterThan(0)); assertThat(statusResponse.benchmarkResponses().size(), greaterThan(0));
for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) { for (BenchmarkResponse benchmarkResponse : statusResponse.benchmarkResponses()) {
assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME)); assertThat(benchmarkResponse.benchmarkName(), equalTo(BENCHMARK_NAME));
@ -135,12 +140,19 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false); validateCompetitionResult(result, competitionSettingsMap.get(result.competitionName()), false);
} }
} }
} finally {
if (waitForTestLatch.getCount() == 1) {
waitForTestLatch.countDown();
}
client().prepareAbortBench(BENCHMARK_NAME).get(); client().prepareAbortBench(BENCHMARK_NAME).get();
// Confirm that there are no active benchmarks in the cluster // Confirm that there are no active benchmarks in the cluster
assertThat(client().prepareBenchStatus().execute().actionGet().totalActiveBenchmarks(), equalTo(0)); assertThat(client().prepareBenchStatus().execute().actionGet().totalActiveBenchmarks(), equalTo(0));
assertThat(waitForTestLatch.getCount(), is(0l));
}
// Confirm that benchmark was indeed aborted // Confirm that benchmark was indeed aborted
assertThat(future.get().state(), isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE)); assertThat(future.get().state(), isOneOf(BenchmarkResponse.State.ABORTED, BenchmarkResponse.State.COMPLETE));
} }
public static CountDownLatch waitForTestLatch; public static CountDownLatch waitForTestLatch;
@ -209,7 +221,6 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
} }
} }
assertThat(response.benchmarkName(), equalTo(BENCHMARK_NAME)); assertThat(response.benchmarkName(), equalTo(BENCHMARK_NAME));
} }
@Test @Test
@ -218,12 +229,15 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
final BenchmarkRequest request = final BenchmarkRequest request =
BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest); BenchmarkTestUtil.randomRequest(client(), indices, numExecutorNodes, competitionSettingsMap, searchRequest);
request.settings().iterations(Integer.MAX_VALUE, true); // massive amount of iterations request.settings().iterations(Integer.MAX_VALUE, true); // massive amount of iterations
logger.info("--> Submitting benchmark - competitors [{}] iterations [{}] [{}]", request.competitors().size(), logger.info("--> Submitting benchmark - competitors [{}] iterations [{}]", request.competitors().size(),
request.settings().iterations(), request); request.settings().iterations());
boolean aborted = false;
final ActionFuture<BenchmarkResponse> benchmarkResponse = client().bench(request); final ActionFuture<BenchmarkResponse> benchmarkResponse = client().bench(request);
assertTrue(waitForQuery.await(5l, TimeUnit.SECONDS)); try {
waitForQuery.await();
final AbortBenchmarkResponse abortResponse = final AbortBenchmarkResponse abortResponse =
client().prepareAbortBench(BENCHMARK_NAME).execute().actionGet(); client().prepareAbortBench(BENCHMARK_NAME).get();
aborted = true;
waitForTestLatch.countDown(); waitForTestLatch.countDown();
// Confirm that the benchmark was actually aborted and did not finish on its own // Confirm that the benchmark was actually aborted and did not finish on its own
assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes)); assertThat(abortResponse.getNodeResponses().size(), lessThanOrEqualTo(numExecutorNodes));
@ -234,13 +248,22 @@ public class BenchmarkIntegrationTest extends ElasticsearchIntegrationTest {
assertThat(nodeResponse.errorMessage(), nullValue()); assertThat(nodeResponse.errorMessage(), nullValue());
assertThat(nodeResponse.nodeName(), notNullValue()); assertThat(nodeResponse.nodeName(), notNullValue());
} }
// Confirm that there are no active benchmarks in the cluster // Confirm that there are no active benchmarks in the cluster
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet(); final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0)); assertThat(statusResponse.totalActiveBenchmarks(), equalTo(0));
// Confirm that benchmark was indeed aborted // Confirm that benchmark was indeed aborted
assertThat(benchmarkResponse.get().state(), is(BenchmarkResponse.State.ABORTED)); assertThat(benchmarkResponse.get().state(), is(BenchmarkResponse.State.ABORTED));
} finally {
if (waitForTestLatch.getCount() == 1) {
waitForTestLatch.countDown();
}
if (!aborted) {
client().prepareAbortBench(BENCHMARK_NAME).get();
}
assertThat(waitForTestLatch.getCount(), is(0l));
}
} }
@Test(expected = BenchmarkMissingException.class) @Test(expected = BenchmarkMissingException.class)

View File

@ -24,8 +24,6 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.bench.BenchmarkNodeMissingException;
import org.elasticsearch.action.bench.BenchmarkStatusResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -38,9 +36,7 @@ import java.net.InetSocketAddress;
import java.util.Random; import java.util.Random;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
/** /**
@ -81,12 +77,6 @@ public abstract class ImmutableTestCluster implements Iterable<Client> {
assertAllSearchersClosed(); assertAllSearchersClosed();
assertAllFilesClosed(); assertAllFilesClosed();
ensureEstimatedStats(); ensureEstimatedStats();
try {
final BenchmarkStatusResponse statusResponse = client().prepareBenchStatus().execute().actionGet();
assertThat(statusResponse.benchmarkResponses(), is(empty()));
} catch (BenchmarkNodeMissingException ex) {
// that's fine
}
} }
/** /**