Bug fix for hung clients on cluster without benchmark nodes

This is a fix for a bug whereby a cluster that has no nodes started with
-Des.node.bench=true will cause clients to hang if they attempt to
submit a benchmark.

Also adds REST tests to validate fix

Closes #5754
This commit is contained in:
Andrew Selden 2014-04-09 17:19:52 -07:00
parent 2f8fc98012
commit 1f7f72135a
7 changed files with 180 additions and 71 deletions

View File

@ -0,0 +1,8 @@
---
"Test benchmark abort":
- do:
benchmark.abort:
name: my_benchmark
catch: request

View File

@ -0,0 +1,7 @@
---
"Test benchmark list":
- do:
benchmark.list: {}
catch: request

View File

@ -0,0 +1,32 @@
---
"Test benchmark submit":
- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_replicas: 0
- do:
cluster.health:
wait_for_status: yellow
- do:
benchmark.submit:
index: test_1
body: {
"name": "my_benchmark",
"competitors": [ {
"name": "my_competitor",
"requests": [ {
"query": {
"match": { "_all": "*" }
}
} ]
} ]
}
catch: request

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bench;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
public class BenchmarkNodeMissingException extends ElasticsearchException {
public BenchmarkNodeMissingException(String msg) {
super(msg);
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -55,7 +55,10 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX
}
/**
* Benchmarks can be in one of: RUNNING, COMPLETE, or ABORTED.
* Benchmarks can be in one of:
* RUNNING - executing normally
* COMPLETE - completed normally
* ABORTED - aborted
*/
public static enum State {
RUNNING((byte) 0),
@ -131,7 +134,7 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.STATUS, state.toString().toLowerCase(Locale.ROOT));
builder.field(Fields.STATUS, state.toString());
builder.startObject(Fields.COMPETITORS);
if (competitionResults != null) {
for (Map.Entry<String, CompetitionResult> entry : competitionResults.entrySet()) {
@ -184,7 +187,6 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX
static final class Fields {
static final XContentBuilderString STATUS = new XContentBuilderString("status");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString COMPETITORS = new XContentBuilderString("competitors");
}
}

View File

@ -97,18 +97,15 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
*/
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {
DiscoveryNodes nodes = clusterService.state().nodes();
int nodeCount = 0;
for (DiscoveryNode node : nodes) {
if (isBenchmarkNode(node)) {
nodeCount++;
}
}
BenchmarkStatusAsyncHandler async = new BenchmarkStatusAsyncHandler(nodeCount, request, listener);
for (DiscoveryNode node : nodes) {
if (isBenchmarkNode(node)) {
transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async);
final List<DiscoveryNode> nodes = availableBenchmarkNodes();
if (nodes.size() == 0) {
listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmarks"));
} else {
BenchmarkStatusAsyncHandler async = new BenchmarkStatusAsyncHandler(nodes.size(), request, listener);
for (DiscoveryNode node : nodes) {
if (isBenchmarkNode(node)) {
transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async);
}
}
}
}
@ -116,42 +113,47 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
/**
* Aborts actively running benchmarks on the cluster
*
* @param id Benchmark id to abort
* @param listener Response listener
* @param benchmarkName Benchmark name to abort
* @param listener Response listener
*/
public void abortBenchmark(final String id, final ActionListener<AbortBenchmarkResponse> listener) {
public void abortBenchmark(final String benchmarkName, final ActionListener<AbortBenchmarkResponse> listener) {
BenchmarkStateListener benchmarkStateListener = new BenchmarkStateListener() {
@Override
public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) {
if (entry != null) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
final ImmutableOpenMap<String, DiscoveryNode> nodes = newState.nodes().nodes();
BenchmarkAbortAsyncHandler async = new BenchmarkAbortAsyncHandler(entry.nodes().length, id, listener);
for (String nodeId : entry.nodes()) {
final DiscoveryNode node = nodes.get(nodeId);
if (node != null) {
transportService.sendRequest(node, AbortExecutionHandler.ACTION, new NodeAbortRequest(id), async);
} else {
logger.debug("Node for ID [" + nodeId + "] not found in cluster state - skipping");
final List<DiscoveryNode> nodes = availableBenchmarkNodes();
if (nodes.size() == 0) {
listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmarks"));
} else {
BenchmarkStateListener benchmarkStateListener = new BenchmarkStateListener() {
@Override
public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) {
if (entry != null) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
final ImmutableOpenMap<String, DiscoveryNode> nodes = newState.nodes().nodes();
BenchmarkAbortAsyncHandler async = new BenchmarkAbortAsyncHandler(entry.nodes().length, benchmarkName, listener);
for (String nodeId : entry.nodes()) {
final DiscoveryNode node = nodes.get(nodeId);
if (node != null) {
transportService.sendRequest(node, AbortExecutionHandler.ACTION, new NodeAbortRequest(benchmarkName), async);
} else {
logger.debug("Node for ID [" + nodeId + "] not found in cluster state - skipping");
}
}
}
}
});
} else {
listener.onResponse(new AbortBenchmarkResponse(id, "Benchmark with id [" + id + "] not found"));
});
} else {
listener.onResponse(new AbortBenchmarkResponse(benchmarkName, "Benchmark with name [" + benchmarkName + "] not found"));
}
}
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
};
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
};
clusterService.submitStateUpdateTask("abort_benchmark", new AbortBenchmarkTask(id, benchmarkStateListener));
clusterService.submitStateUpdateTask("abort_benchmark", new AbortBenchmarkTask(benchmarkName, benchmarkStateListener));
}
}
/**
@ -162,32 +164,41 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
*/
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {
final BenchmarkStateListener benchListener = new BenchmarkStateListener() {
@Override
public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
final ImmutableOpenMap<String, DiscoveryNode> nodes = newState.nodes().nodes();
final BenchmarkSearchAsyncHandler async = new BenchmarkSearchAsyncHandler(entry.nodes().length, request, listener);
for (String nodeId : entry.nodes()) {
final DiscoveryNode node = nodes.get(nodeId);
if (node == null) {
async.handleExceptionInternal(
new ElasticsearchIllegalStateException("Node for ID [" + nodeId + "] not found in cluster state - skipping"));
final List<DiscoveryNode> nodes = availableBenchmarkNodes();
if (nodes.size() == 0) {
listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmark [" +
request.benchmarkName() + "]"));
} else {
final BenchmarkStateListener benchListener = new BenchmarkStateListener() {
@Override
public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
final ImmutableOpenMap<String, DiscoveryNode> nodes = newState.nodes().nodes();
final BenchmarkSearchAsyncHandler async = new BenchmarkSearchAsyncHandler(entry.nodes().length, request, listener);
for (String nodeId : entry.nodes()) {
final DiscoveryNode node = nodes.get(nodeId);
if (node == null) {
async.handleExceptionInternal(
new ElasticsearchIllegalStateException("Node for ID [" + nodeId + "] not found in cluster state - skipping"));
} else {
logger.debug("Starting benchmark [{}] node [{}]", request.benchmarkName(), node.name());
transportService.sendRequest(node, BenchExecutionHandler.ACTION, new NodeBenchRequest(request), async);
}
}
transportService.sendRequest(node, BenchExecutionHandler.ACTION, new NodeBenchRequest(request), async);
}
}
});
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
};
});
}
clusterService.submitStateUpdateTask("start_benchmark", new StartBenchmarkTask(request, benchListener));
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
};
clusterService.submitStateUpdateTask("start_benchmark", new StartBenchmarkTask(request, benchListener));
}
}
private void finishBenchmark(final BenchmarkResponse benchmarkResponse, final String benchmarkId, final ActionListener<BenchmarkResponse> listener) {
@ -568,7 +579,6 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
}
}
}
logger.debug("Starting benchmark for ID [{}]", request.benchmarkName());
List<DiscoveryNode> nodes = findNodes(request);
String[] nodeIds = new String[nodes.size()];
int i = 0;
@ -717,4 +727,14 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
}
}
private List<DiscoveryNode> availableBenchmarkNodes() {
DiscoveryNodes nodes = clusterService.state().nodes();
List<DiscoveryNode> benchmarkNodes = new ArrayList<>(nodes.size());
for (DiscoveryNode node : nodes) {
if (isBenchmarkNode(node)) {
benchmarkNodes.add(node);
}
}
return benchmarkNodes;
}
}

View File

@ -45,8 +45,10 @@ import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.METHOD_NOT_ALLOWED;
import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.elasticsearch.common.xcontent.json.JsonXContent.contentBuilder;
/**
@ -104,7 +106,7 @@ public class RestBenchAction extends BaseRestHandler {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
return new BytesRestResponse(OK, builder);
}
});
}
@ -124,7 +126,7 @@ public class RestBenchAction extends BaseRestHandler {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
return new BytesRestResponse(OK, builder);
}
});
}
@ -165,7 +167,7 @@ public class RestBenchAction extends BaseRestHandler {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
return new BytesRestResponse(OK, builder);
}
});
}