From c82f27577b5ad0e0dce21c6e22a5134e319c2c8b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sat, 25 Jan 2014 18:20:46 +0100 Subject: [PATCH] Added dedicated thread pool cat api, that can show all thread pool related statistic (size, rejected, queue etc.) for all thread pools (get, search, index etc.) By default active, rejected and queue thread statistics are included for the index, bulk and search thread pool. Other thread statistics of other thread pools can be included via the `h` query string parameter. Closes #4907 --- docs/reference/cat.asciidoc | 2 + docs/reference/cat/thread_pool.asciidoc | 44 +++ .../rest/action/RestActionModule.java | 1 + .../rest/action/cat/RestThreadPoolAction.java | 302 ++++++++++++++++++ 4 files changed, 349 insertions(+) create mode 100644 docs/reference/cat/thread_pool.asciidoc create mode 100644 src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java diff --git a/docs/reference/cat.asciidoc b/docs/reference/cat.asciidoc index d53b7236231..6202c650bf0 100644 --- a/docs/reference/cat.asciidoc +++ b/docs/reference/cat.asciidoc @@ -112,4 +112,6 @@ include::cat/pending_tasks.asciidoc[] include::cat/recovery.asciidoc[] +include::cat/thread_pool.asciidoc[] + include::cat/shards.asciidoc[] diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc new file mode 100644 index 00000000000..7dfc5d492ec --- /dev/null +++ b/docs/reference/cat/thread_pool.asciidoc @@ -0,0 +1,44 @@ +[[cat-thread-pool]] +== Thread pool + +The `thread_pool` command shows cluster wide thread pool statistics per node. By default the active, queue and rejected +statistics are returned for the bulk, index and search thread pools. + +[source,shell] +-------------------------------------------------- +% curl 192.168.56.10:9200/_cat/thread_pool +host1 192.168.1.35 0 0 0 0 0 0 0 0 0 +host2 192.168.1.36 0 0 0 0 0 0 0 0 0 +-------------------------------------------------- + +The first two columns tell your the host and ip of a node. + +[source,shell] +-------------------------------------------------- +host ip +host1 192.168.1.35 +host2 192.168.1.36 +-------------------------------------------------- + +The next three columns show the active queue and rejected statistics for the bulk thread pool. + +[source,shell] +-------------------------------------------------- +bulk.active bulk.queue bulk.rejected + 0 0 0 +-------------------------------------------------- + +The remaining columns show the active queue and rejected statistics of the index and search thread pool respectively. + +Also other statistics of different thread pools can be retrieved by using the `h` (header) parameter. + +[source,shell] +-------------------------------------------------- +% curl 'localhost:9200/_cat/thread_pool?v&h=id,host,suggest.active,suggest.rejected,suggest.completed' +host suggest.active suggest.rejected suggest.completed +host1 0 0 0 +host2 0 0 0 +-------------------------------------------------- + +Here the host columns and the active, rejected and completed suggest thread pool statistic are displayed. The suggest +thread pool won't be displayed by default, so you always need be specific about what statistic you want to display. \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 91a5f25984a..fcb19619527 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -223,6 +223,7 @@ public class RestActionModule extends AbstractModule { catActionMultibinder.addBinding().to(RestHealthAction.class).asEagerSingleton(); catActionMultibinder.addBinding().to(org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction.class).asEagerSingleton(); catActionMultibinder.addBinding().to(RestAliasAction.class).asEagerSingleton(); + catActionMultibinder.addBinding().to(RestThreadPoolAction.class).asEagerSingleton(); // no abstract cat action bind(RestCatAction.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java new file mode 100644 index 00000000000..d33ab34b487 --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -0,0 +1,302 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.cat; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.Table; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.XContentThrowableRestResponse; +import org.elasticsearch.rest.action.support.RestTable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestThreadPoolAction extends AbstractCatAction { + + private final static String[] SUPPORTED_NAMES = new String[] { + ThreadPool.Names.BULK, + ThreadPool.Names.FLUSH, + ThreadPool.Names.GENERIC, + ThreadPool.Names.GET, + ThreadPool.Names.INDEX, + ThreadPool.Names.MANAGEMENT, + ThreadPool.Names.MERGE, + ThreadPool.Names.OPTIMIZE, + ThreadPool.Names.PERCOLATE, + ThreadPool.Names.REFRESH, + ThreadPool.Names.SEARCH, + ThreadPool.Names.SNAPSHOT, + ThreadPool.Names.SUGGEST, + ThreadPool.Names.WARMER + }; + + private final static String[] SUPPORTED_ALIASES = new String[] { + "b", + "f", + "ge", + "g", + "i", + "ma", + "m", + "o", + "p", + "r", + "s", + "sn", + "su", + "w" + }; + + private final static String[] DEFAULT_THREAD_POOLS = new String[] { + ThreadPool.Names.BULK, + ThreadPool.Names.INDEX, + ThreadPool.Names.SEARCH, + }; + + private final static Map ALIAS_TO_THREAD_POOL; + private final static Map THREAD_POOL_TO_ALIAS; + + static { + ALIAS_TO_THREAD_POOL = Maps.newHashMapWithExpectedSize(SUPPORTED_NAMES.length); + for (String supportedThreadPool : SUPPORTED_NAMES) { + ALIAS_TO_THREAD_POOL.put(supportedThreadPool.substring(0, 3), supportedThreadPool); + } + THREAD_POOL_TO_ALIAS = Maps.newHashMapWithExpectedSize(SUPPORTED_NAMES.length); + for (int i = 0; i < SUPPORTED_NAMES.length; i++) { + THREAD_POOL_TO_ALIAS.put(SUPPORTED_NAMES[i], SUPPORTED_ALIASES[i]); + } + } + + @Inject + public RestThreadPoolAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(GET, "/_cat/thread_pool", this); + } + + @Override + void documentation(StringBuilder sb) { + sb.append("/_cat/thread_pool\n"); + } + + @Override + public void doRequest(final RestRequest request, final RestChannel channel) { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear().nodes(true); + clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); + clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + final String[] pools = fetchSortedPools(request, DEFAULT_THREAD_POOLS); + + client.admin().cluster().state(clusterStateRequest, new ActionListener() { + @Override + public void onResponse(final ClusterStateResponse clusterStateResponse) { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.clear().process(true); + client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener() { + @Override + public void onResponse(final NodesInfoResponse nodesInfoResponse) { + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.clear().threadPool(true); + client.admin().cluster().nodesStats(nodesStatsRequest, new ActionListener() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + try { + channel.sendResponse(RestTable.buildResponse(buildTable(request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse, pools), request, channel)); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + @Override + Table getTableWithHeader(final RestRequest request) { + Table table = new Table(); + table.startHeaders(); + table.addCell("id", "default:false;alias:id,nodeId;desc:unique node id"); + table.addCell("pid", "default:false;alias:p;desc:process id"); + table.addCell("host", "alias:h;desc:host name"); + table.addCell("ip", "alias:i;desc:ip address"); + table.addCell("port", "default:false;alias:po;desc:bound transport port"); + + final String[] requestedPools = fetchSortedPools(request, DEFAULT_THREAD_POOLS); + for (String pool : SUPPORTED_NAMES) { + String poolAlias = THREAD_POOL_TO_ALIAS.get(pool); + boolean display = false; + for (String requestedPool : requestedPools) { + if (pool.equals(requestedPool)) { + display = true; + break; + } + } + + String defaultDisplayVal = Boolean.toString(display); + table.addCell( + pool + ".active", + "alias:" + poolAlias + "a;default:" + defaultDisplayVal + ";text-align:right;desc:number of active " + pool + " threads" + ); + table.addCell( + pool + ".size", + "alias:" + poolAlias + "s;default:false;text-align:right;desc:number of active " + pool + " threads" + ); + table.addCell( + pool + ".queue", + "alias:" + poolAlias + "q;default:" + defaultDisplayVal + ";text-align:right;desc:number of " + pool + " threads in queue" + ); + table.addCell( + pool + ".rejected", + "alias:" + poolAlias + "r;default:" + defaultDisplayVal + ";text-align:right;desc:number of rejected " + pool + " threads" + ); + table.addCell( + pool + ".largest", + "alias:" + poolAlias + "l;default:false;text-align:right;desc:highest number of seen active " + pool + " threads" + ); + table.addCell( + pool + ".completed", + "alias:" + poolAlias + "c;default:false;text-align:right;desc:number of completed " + pool + " threads" + ); + } + + table.endHeaders(); + return table; + } + + + private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoResponse nodesInfo, NodesStatsResponse nodesStats, String[] pools) { + boolean fullId = req.paramAsBoolean("full_id", false); + DiscoveryNodes nodes = state.getState().nodes(); + Table table = getTableWithHeader(req); + + for (DiscoveryNode node : nodes) { + NodeInfo info = nodesInfo.getNodesMap().get(node.id()); + NodeStats stats = nodesStats.getNodesMap().get(node.id()); + table.startRow(); + + table.addCell(fullId ? node.id() : Strings.substring(node.getId(), 0, 4)); + table.addCell(info == null ? null : info.getProcess().id()); + table.addCell(node.getHostName()); + table.addCell(node.getHostAddress()); + if (node.address() instanceof InetSocketTransportAddress) { + table.addCell(((InetSocketTransportAddress) node.address()).address().getPort()); + } else { + table.addCell("-"); + } + + final Map poolThreadStats; + if (stats == null) { + poolThreadStats = Collections.emptyMap(); + } else { + poolThreadStats = new HashMap(14); + ThreadPoolStats threadPoolStats = stats.getThreadPool(); + for (ThreadPoolStats.Stats threadPoolStat : threadPoolStats) { + poolThreadStats.put(threadPoolStat.getName(), threadPoolStat); + } + } + for (String pool : SUPPORTED_NAMES) { + ThreadPoolStats.Stats poolStats = poolThreadStats.get(pool); + table.addCell(poolStats == null ? null : poolStats.getActive()); + table.addCell(poolStats == null ? null : poolStats.getThreads()); + table.addCell(poolStats == null ? null : poolStats.getQueue()); + table.addCell(poolStats == null ? null : poolStats.getRejected()); + table.addCell(poolStats == null ? null : poolStats.getLargest()); + table.addCell(poolStats == null ? null : poolStats.getCompleted()); + } + + table.endRow(); + } + + return table; + } + + // The thread pool columns should always be in the same order. + private String[] fetchSortedPools(RestRequest request, String[] defaults) { + String[] headers = request.paramAsStringArray("h", null); + if (headers == null) { + return defaults; + } else { + Set requestedPools = new LinkedHashSet(headers.length); + for (String header : headers) { + int dotIndex = header.indexOf('.'); + if (dotIndex != -1) { + String headerPrefix = header.substring(0, dotIndex); + if (THREAD_POOL_TO_ALIAS.containsKey(headerPrefix)) { + requestedPools.add(headerPrefix); + } + } else if (ALIAS_TO_THREAD_POOL.containsKey(header)) { + requestedPools.add(ALIAS_TO_THREAD_POOL.get(header)); + } + + } + return requestedPools.toArray(new String[0]); + } + } +}