From 1f7f72135a5659d2a8f72d7b88ab47e15dfae20c Mon Sep 17 00:00:00 2001 From: Andrew Selden Date: Wed, 9 Apr 2014 17:19:52 -0700 Subject: [PATCH] Bug fix for hung clients on cluster without benchmark nodes This is a fix for a bug whereby a cluster that has no nodes started with -Des.node.bench=true will cause clients to hang if they attempt to submit a benchmark. Also adds REST tests to validate fix Closes #5754 --- .../test/benchmark.abort/10_basic.yaml | 8 + .../test/benchmark.list/10_basic.yaml | 7 + .../test/benchmark.submit/10_basic.yaml | 32 ++++ .../bench/BenchmarkNodeMissingException.java | 38 +++++ .../action/bench/BenchmarkResponse.java | 8 +- .../action/bench/BenchmarkService.java | 150 ++++++++++-------- .../rest/action/bench/RestBenchAction.java | 8 +- 7 files changed, 180 insertions(+), 71 deletions(-) create mode 100644 rest-api-spec/test/benchmark.abort/10_basic.yaml create mode 100644 rest-api-spec/test/benchmark.list/10_basic.yaml create mode 100644 rest-api-spec/test/benchmark.submit/10_basic.yaml create mode 100644 src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java diff --git a/rest-api-spec/test/benchmark.abort/10_basic.yaml b/rest-api-spec/test/benchmark.abort/10_basic.yaml new file mode 100644 index 00000000000..50089a0afed --- /dev/null +++ b/rest-api-spec/test/benchmark.abort/10_basic.yaml @@ -0,0 +1,8 @@ +--- +"Test benchmark abort": + + - do: + benchmark.abort: + name: my_benchmark + catch: request + diff --git a/rest-api-spec/test/benchmark.list/10_basic.yaml b/rest-api-spec/test/benchmark.list/10_basic.yaml new file mode 100644 index 00000000000..4ae1ae0f3bf --- /dev/null +++ b/rest-api-spec/test/benchmark.list/10_basic.yaml @@ -0,0 +1,7 @@ +--- +"Test benchmark list": + + - do: + benchmark.list: {} + catch: request + diff --git a/rest-api-spec/test/benchmark.submit/10_basic.yaml b/rest-api-spec/test/benchmark.submit/10_basic.yaml new file mode 100644 index 00000000000..293acbdd2f6 --- /dev/null +++ b/rest-api-spec/test/benchmark.submit/10_basic.yaml @@ -0,0 +1,32 @@ +--- +"Test benchmark submit": + + - do: + indices.create: + index: test_1 + body: + settings: + index: + number_of_replicas: 0 + + - do: + cluster.health: + wait_for_status: yellow + + - do: + benchmark.submit: + index: test_1 + body: { + "name": "my_benchmark", + "competitors": [ { + "name": "my_competitor", + "requests": [ { + "query": { + "match": { "_all": "*" } + } + } ] + } ] + } + catch: request + + diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java new file mode 100644 index 00000000000..a83659c7ddb --- /dev/null +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkNodeMissingException.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bench; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.rest.RestStatus; + +/** + * + */ +public class BenchmarkNodeMissingException extends ElasticsearchException { + + public BenchmarkNodeMissingException(String msg) { + super(msg); + } + + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } +} diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java index 4d407b59fdf..cbfc86742f3 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkResponse.java @@ -55,7 +55,10 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX } /** - * Benchmarks can be in one of: RUNNING, COMPLETE, or ABORTED. + * Benchmarks can be in one of: + * RUNNING - executing normally + * COMPLETE - completed normally + * ABORTED - aborted */ public static enum State { RUNNING((byte) 0), @@ -131,7 +134,7 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(Fields.STATUS, state.toString().toLowerCase(Locale.ROOT)); + builder.field(Fields.STATUS, state.toString()); builder.startObject(Fields.COMPETITORS); if (competitionResults != null) { for (Map.Entry entry : competitionResults.entrySet()) { @@ -184,7 +187,6 @@ public class BenchmarkResponse extends ActionResponse implements Streamable, ToX static final class Fields { static final XContentBuilderString STATUS = new XContentBuilderString("status"); - static final XContentBuilderString INDEX = new XContentBuilderString("index"); static final XContentBuilderString COMPETITORS = new XContentBuilderString("competitors"); } } diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java index 6e3c4fdb90f..36af97831df 100644 --- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java +++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java @@ -97,18 +97,15 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) { - DiscoveryNodes nodes = clusterService.state().nodes(); - int nodeCount = 0; - for (DiscoveryNode node : nodes) { - if (isBenchmarkNode(node)) { - nodeCount++; - } - } - - BenchmarkStatusAsyncHandler async = new BenchmarkStatusAsyncHandler(nodeCount, request, listener); - for (DiscoveryNode node : nodes) { - if (isBenchmarkNode(node)) { - transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async); + final List nodes = availableBenchmarkNodes(); + if (nodes.size() == 0) { + listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmarks")); + } else { + BenchmarkStatusAsyncHandler async = new BenchmarkStatusAsyncHandler(nodes.size(), request, listener); + for (DiscoveryNode node : nodes) { + if (isBenchmarkNode(node)) { + transportService.sendRequest(node, StatusExecutionHandler.ACTION, new NodeStatusRequest(request), async); + } } } } @@ -116,42 +113,47 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) { + public void abortBenchmark(final String benchmarkName, final ActionListener listener) { - BenchmarkStateListener benchmarkStateListener = new BenchmarkStateListener() { - @Override - public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) { - if (entry != null) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - final ImmutableOpenMap nodes = newState.nodes().nodes(); - BenchmarkAbortAsyncHandler async = new BenchmarkAbortAsyncHandler(entry.nodes().length, id, listener); - for (String nodeId : entry.nodes()) { - final DiscoveryNode node = nodes.get(nodeId); - if (node != null) { - transportService.sendRequest(node, AbortExecutionHandler.ACTION, new NodeAbortRequest(id), async); - } else { - logger.debug("Node for ID [" + nodeId + "] not found in cluster state - skipping"); + final List nodes = availableBenchmarkNodes(); + if (nodes.size() == 0) { + listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmarks")); + } else { + BenchmarkStateListener benchmarkStateListener = new BenchmarkStateListener() { + @Override + public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) { + if (entry != null) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + final ImmutableOpenMap nodes = newState.nodes().nodes(); + BenchmarkAbortAsyncHandler async = new BenchmarkAbortAsyncHandler(entry.nodes().length, benchmarkName, listener); + for (String nodeId : entry.nodes()) { + final DiscoveryNode node = nodes.get(nodeId); + if (node != null) { + transportService.sendRequest(node, AbortExecutionHandler.ACTION, new NodeAbortRequest(benchmarkName), async); + } else { + logger.debug("Node for ID [" + nodeId + "] not found in cluster state - skipping"); + } } } - } - }); - } else { - listener.onResponse(new AbortBenchmarkResponse(id, "Benchmark with id [" + id + "] not found")); + }); + } else { + listener.onResponse(new AbortBenchmarkResponse(benchmarkName, "Benchmark with name [" + benchmarkName + "] not found")); + } } - } - @Override - public void onFailure(Throwable t) { - listener.onFailure(t); - } - }; + @Override + public void onFailure(Throwable t) { + listener.onFailure(t); + } + }; - clusterService.submitStateUpdateTask("abort_benchmark", new AbortBenchmarkTask(id, benchmarkStateListener)); + clusterService.submitStateUpdateTask("abort_benchmark", new AbortBenchmarkTask(benchmarkName, benchmarkStateListener)); + } } /** @@ -162,32 +164,41 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) { - final BenchmarkStateListener benchListener = new BenchmarkStateListener() { - @Override - public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - final ImmutableOpenMap nodes = newState.nodes().nodes(); - final BenchmarkSearchAsyncHandler async = new BenchmarkSearchAsyncHandler(entry.nodes().length, request, listener); - for (String nodeId : entry.nodes()) { - final DiscoveryNode node = nodes.get(nodeId); - if (node == null) { - async.handleExceptionInternal( - new ElasticsearchIllegalStateException("Node for ID [" + nodeId + "] not found in cluster state - skipping")); + final List nodes = availableBenchmarkNodes(); + if (nodes.size() == 0) { + listener.onFailure(new BenchmarkNodeMissingException("No available nodes for executing benchmark [" + + request.benchmarkName() + "]")); + } else { + final BenchmarkStateListener benchListener = new BenchmarkStateListener() { + @Override + public void onResponse(final ClusterState newState, final BenchmarkMetaData.Entry entry) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + final ImmutableOpenMap nodes = newState.nodes().nodes(); + final BenchmarkSearchAsyncHandler async = new BenchmarkSearchAsyncHandler(entry.nodes().length, request, listener); + for (String nodeId : entry.nodes()) { + final DiscoveryNode node = nodes.get(nodeId); + if (node == null) { + async.handleExceptionInternal( + new ElasticsearchIllegalStateException("Node for ID [" + nodeId + "] not found in cluster state - skipping")); + } else { + logger.debug("Starting benchmark [{}] node [{}]", request.benchmarkName(), node.name()); + transportService.sendRequest(node, BenchExecutionHandler.ACTION, new NodeBenchRequest(request), async); + } } - transportService.sendRequest(node, BenchExecutionHandler.ACTION, new NodeBenchRequest(request), async); } - } - }); - } - @Override - public void onFailure(Throwable t) { - listener.onFailure(t); - } - }; + }); + } - clusterService.submitStateUpdateTask("start_benchmark", new StartBenchmarkTask(request, benchListener)); + @Override + public void onFailure(Throwable t) { + listener.onFailure(t); + } + }; + + clusterService.submitStateUpdateTask("start_benchmark", new StartBenchmarkTask(request, benchListener)); + } } private void finishBenchmark(final BenchmarkResponse benchmarkResponse, final String benchmarkId, final ActionListener listener) { @@ -568,7 +579,6 @@ public class BenchmarkService extends AbstractLifecycleComponent nodes = findNodes(request); String[] nodeIds = new String[nodes.size()]; int i = 0; @@ -717,4 +727,14 @@ public class BenchmarkService extends AbstractLifecycleComponent availableBenchmarkNodes() { + DiscoveryNodes nodes = clusterService.state().nodes(); + List benchmarkNodes = new ArrayList<>(nodes.size()); + for (DiscoveryNode node : nodes) { + if (isBenchmarkNode(node)) { + benchmarkNodes.add(node); + } + } + return benchmarkNodes; + } } diff --git a/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java b/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java index 03fe65eb0f1..1738d7cd545 100644 --- a/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java +++ b/src/main/java/org/elasticsearch/rest/action/bench/RestBenchAction.java @@ -45,8 +45,10 @@ import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.PUT; import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.OK; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.METHOD_NOT_ALLOWED; +import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE; import static org.elasticsearch.common.xcontent.json.JsonXContent.contentBuilder; /** @@ -104,7 +106,7 @@ public class RestBenchAction extends BaseRestHandler { builder.startObject(); response.toXContent(builder, request); builder.endObject(); - return new BytesRestResponse(RestStatus.OK, builder); + return new BytesRestResponse(OK, builder); } }); } @@ -124,7 +126,7 @@ public class RestBenchAction extends BaseRestHandler { builder.startObject(); response.toXContent(builder, request); builder.endObject(); - return new BytesRestResponse(RestStatus.OK, builder); + return new BytesRestResponse(OK, builder); } }); } @@ -165,7 +167,7 @@ public class RestBenchAction extends BaseRestHandler { builder.startObject(); response.toXContent(builder, request); builder.endObject(); - return new BytesRestResponse(RestStatus.OK, builder); + return new BytesRestResponse(OK, builder); } }); }