Add thread_pool to nodes info and nodes stats APIs, closes #1601.

This commit is contained in:
Shay Banon 2012-01-10 17:45:10 +02:00
parent 3b92962ddd
commit 9d979dfc01
16 changed files with 625 additions and 39 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.network.NetworkInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
import java.io.IOException;
@ -60,6 +61,9 @@ public class NodeInfo extends NodeOperationResponse {
@Nullable
private JvmInfo jvm;
@Nullable
private ThreadPoolInfo threadPool;
@Nullable
private NetworkInfo network;
@ -73,7 +77,7 @@ public class NodeInfo extends NodeOperationResponse {
}
public NodeInfo(@Nullable String hostname, DiscoveryNode node, @Nullable ImmutableMap<String, String> serviceAttributes, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable NetworkInfo network,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool, @Nullable NetworkInfo network,
@Nullable TransportInfo transport, @Nullable HttpInfo http) {
super(node);
this.hostname = hostname;
@ -82,6 +86,7 @@ public class NodeInfo extends NodeOperationResponse {
this.os = os;
this.process = process;
this.jvm = jvm;
this.threadPool = threadPool;
this.network = network;
this.transport = transport;
this.http = http;
@ -183,6 +188,16 @@ public class NodeInfo extends NodeOperationResponse {
return jvm();
}
@Nullable
public ThreadPoolInfo threadPool() {
return this.threadPool;
}
@Nullable
public ThreadPoolInfo getThreadPool() {
return threadPool();
}
/**
* Network level information.
*/
@ -251,6 +266,9 @@ public class NodeInfo extends NodeOperationResponse {
if (in.readBoolean()) {
jvm = JvmInfo.readJvmInfo(in);
}
if (in.readBoolean()) {
threadPool = ThreadPoolInfo.readThreadPoolInfo(in);
}
if (in.readBoolean()) {
network = NetworkInfo.readNetworkInfo(in);
}
@ -305,6 +323,12 @@ public class NodeInfo extends NodeOperationResponse {
out.writeBoolean(true);
jvm.writeTo(out);
}
if (threadPool == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
threadPool.writeTo(out);
}
if (network == null) {
out.writeBoolean(false);
} else {

View File

@ -34,6 +34,7 @@ public class NodesInfoRequest extends NodesOperationRequest {
private boolean os = false;
private boolean process = false;
private boolean jvm = false;
private boolean threadPool = false;
private boolean network = false;
private boolean transport = false;
private boolean http = false;
@ -57,6 +58,7 @@ public class NodesInfoRequest extends NodesOperationRequest {
os = false;
process = false;
jvm = false;
threadPool = false;
network = false;
transport = false;
http = false;
@ -123,6 +125,21 @@ public class NodesInfoRequest extends NodesOperationRequest {
return this;
}
/**
* Should the node Thread Pool info be returned.
*/
public boolean threadPool() {
return this.threadPool;
}
/**
* Should the node Thread Pool info be returned.
*/
public NodesInfoRequest threadPool(boolean threadPool) {
this.threadPool = threadPool;
return this;
}
/**
* Should the node Network be returned.
*/
@ -175,6 +192,7 @@ public class NodesInfoRequest extends NodesOperationRequest {
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
network = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
@ -187,6 +205,7 @@ public class NodesInfoRequest extends NodesOperationRequest {
out.writeBoolean(os);
out.writeBoolean(process);
out.writeBoolean(jvm);
out.writeBoolean(threadPool);
out.writeBoolean(network);
out.writeBoolean(transport);
out.writeBoolean(http);

View File

@ -116,6 +116,9 @@ public class NodesInfoResponse extends NodesOperationResponse<NodeInfo> implemen
if (nodeInfo.jvm() != null) {
nodeInfo.jvm().toXContent(builder, params);
}
if (nodeInfo.threadPool() != null) {
nodeInfo.threadPool().toXContent(builder, params);
}
if (nodeInfo.network() != null) {
nodeInfo.network().toXContent(builder, params);
}

View File

@ -98,7 +98,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
@Override
protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) throws ElasticSearchException {
NodesInfoRequest request = nodeRequest.request;
return nodeService.info(request.settings(), request.os(), request.process(), request.jvm(), request.network(), request.transport(), request.http());
return nodeService.info(request.settings(), request.os(), request.process(), request.jvm(), request.threadPool(), request.network(), request.transport(), request.http());
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats;
import org.elasticsearch.monitor.os.OsStats;
import org.elasticsearch.monitor.process.ProcessStats;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
@ -54,6 +55,9 @@ public class NodeStats extends NodeOperationResponse {
@Nullable
private JvmStats jvm;
@Nullable
private ThreadPoolStats threadPool;
@Nullable
private NetworkStats network;
@ -67,7 +71,7 @@ public class NodeStats extends NodeOperationResponse {
}
public NodeStats(DiscoveryNode node, @Nullable String hostname, @Nullable NodeIndicesStats indices,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable NetworkStats network,
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool, @Nullable NetworkStats network,
@Nullable TransportStats transport, @Nullable HttpStats http) {
super(node);
this.hostname = hostname;
@ -75,6 +79,7 @@ public class NodeStats extends NodeOperationResponse {
this.os = os;
this.process = process;
this.jvm = jvm;
this.threadPool = threadPool;
this.network = network;
this.transport = transport;
this.http = http;
@ -154,6 +159,22 @@ public class NodeStats extends NodeOperationResponse {
return jvm();
}
/**
* Thread Pool level statistics.
*/
@Nullable
public ThreadPoolStats threadPool() {
return this.threadPool;
}
/**
* Thread Pool level statistics.
*/
@Nullable
public ThreadPoolStats getThreadPool() {
return threadPool();
}
/**
* Network level statistics.
*/
@ -214,6 +235,9 @@ public class NodeStats extends NodeOperationResponse {
if (in.readBoolean()) {
jvm = JvmStats.readJvmStats(in);
}
if (in.readBoolean()) {
threadPool = ThreadPoolStats.readThreadPoolStats(in);
}
if (in.readBoolean()) {
network = NetworkStats.readNetworkStats(in);
}
@ -258,6 +282,12 @@ public class NodeStats extends NodeOperationResponse {
out.writeBoolean(true);
jvm.writeTo(out);
}
if (threadPool == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
threadPool.writeTo(out);
}
if (network == null) {
out.writeBoolean(false);
} else {

View File

@ -34,6 +34,7 @@ public class NodesStatsRequest extends NodesOperationRequest {
private boolean os;
private boolean process;
private boolean jvm;
private boolean threadPool;
private boolean network;
private boolean transport;
private boolean http;
@ -57,6 +58,7 @@ public class NodesStatsRequest extends NodesOperationRequest {
this.os = false;
this.process = false;
this.jvm = false;
this.threadPool = false;
this.network = false;
this.transport = false;
this.http = false;
@ -123,6 +125,21 @@ public class NodesStatsRequest extends NodesOperationRequest {
return this;
}
/**
* Should the node Thread Pool be returned.
*/
public boolean threadPool() {
return this.threadPool;
}
/**
* Should the node Thread Pool be returned.
*/
public NodesStatsRequest threadPool(boolean threadPool) {
this.threadPool = threadPool;
return this;
}
/**
* Should the node Network be returned.
*/
@ -175,6 +192,7 @@ public class NodesStatsRequest extends NodesOperationRequest {
os = in.readBoolean();
process = in.readBoolean();
jvm = in.readBoolean();
threadPool = in.readBoolean();
network = in.readBoolean();
transport = in.readBoolean();
http = in.readBoolean();
@ -187,6 +205,7 @@ public class NodesStatsRequest extends NodesOperationRequest {
out.writeBoolean(os);
out.writeBoolean(process);
out.writeBoolean(jvm);
out.writeBoolean(threadPool);
out.writeBoolean(network);
out.writeBoolean(transport);
out.writeBoolean(http);

View File

@ -95,6 +95,9 @@ public class NodesStatsResponse extends NodesOperationResponse<NodeStats> implem
if (nodeStats.jvm() != null) {
nodeStats.jvm().toXContent(builder, params);
}
if (nodeStats.threadPool() != null) {
nodeStats.threadPool().toXContent(builder, params);
}
if (nodeStats.network() != null) {
nodeStats.network().toXContent(builder, params);
}

View File

@ -98,7 +98,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
@Override
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) throws ElasticSearchException {
NodesStatsRequest request = nodeStatsRequest.request;
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.network(), request.transport(), request.http());
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(), request.network(), request.transport(), request.http());
}
@Override

View File

@ -79,6 +79,14 @@ public class NodesInfoRequestBuilder extends BaseClusterRequestBuilder<NodesInfo
return this;
}
/**
* Should the node thread pool info be returned.
*/
public NodesInfoRequestBuilder setThreadPool(boolean threadPool) {
request.threadPool(threadPool);
return this;
}
/**
* Should the node Network info be returned.
*/

View File

@ -79,6 +79,14 @@ public class NodesStatsRequestBuilder extends BaseClusterRequestBuilder<NodesSta
return this;
}
/**
* Should the node thread pool stats be returned.
*/
public NodesStatsRequestBuilder setThreadPool(boolean threadPool) {
request.threadPool(threadPool);
return this;
}
/**
* Should the node Network stats be returned.
*/

View File

@ -33,6 +33,7 @@ import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.net.InetAddress;
@ -41,6 +42,8 @@ import java.net.InetAddress;
*/
public class NodeService extends AbstractComponent {
private final ThreadPool threadPool;
private final MonitorService monitorService;
private final ClusterService clusterService;
@ -58,8 +61,9 @@ public class NodeService extends AbstractComponent {
private String hostname;
@Inject
public NodeService(Settings settings, MonitorService monitorService, Discovery discovery, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
this.clusterService = clusterService;
this.transportService = transportService;
@ -101,18 +105,25 @@ public class NodeService extends AbstractComponent {
}
public NodeInfo info() {
return new NodeInfo(hostname, clusterService.state().nodes().localNode(), serviceAttributes, settings,
monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(),
transportService.info(), httpServer == null ? null : httpServer.info());
return new NodeInfo(hostname, clusterService.state().nodes().localNode(), serviceAttributes,
settings,
monitorService.osService().info(),
monitorService.processService().info(),
monitorService.jvmService().info(),
threadPool.info(),
monitorService.networkService().info(),
transportService.info(),
httpServer == null ? null : httpServer.info()
);
}
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean network, boolean transport, boolean http) {
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean transport, boolean http) {
return new NodeInfo(hostname, clusterService.state().nodes().localNode(), serviceAttributes,
settings ? this.settings : null,
os ? monitorService.osService().info() : null,
process ? monitorService.processService().info() : null,
jvm ? monitorService.jvmService().info() : null,
threadPool ? this.threadPool.info() : null,
network ? monitorService.networkService().info() : null,
transport ? transportService.info() : null,
http ? (httpServer == null ? null : httpServer.info()) : null
@ -122,13 +133,19 @@ public class NodeService extends AbstractComponent {
public NodeStats stats() {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), hostname, indicesService.stats(true),
monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(),
transportService.stats(), httpServer == null ? null : httpServer.stats());
return new NodeStats(clusterService.state().nodes().localNode(), hostname,
indicesService.stats(true),
monitorService.osService().stats(),
monitorService.processService().stats(),
monitorService.jvmService().stats(),
threadPool.stats(),
monitorService.networkService().stats(),
transportService.stats(),
httpServer == null ? null : httpServer.stats()
);
}
public NodeStats stats(boolean indices, boolean os, boolean process, boolean jvm, boolean network, boolean transport, boolean http) {
public NodeStats stats(boolean indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean transport, boolean http) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), hostname,
@ -136,8 +153,10 @@ public class NodeService extends AbstractComponent {
os ? monitorService.osService().stats() : null,
process ? monitorService.processService().stats() : null,
jvm ? monitorService.jvmService().stats() : null,
threadPool ? this.threadPool.stats() : null,
network ? monitorService.networkService().stats() : null,
transport ? transportService.stats() : null,
http ? (httpServer == null ? null : httpServer.stats()) : null);
http ? (httpServer == null ? null : httpServer.stats()) : null
);
}
}

View File

@ -61,6 +61,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
controller.registerHandler(RestRequest.Method.GET, "/_nodes/jvm", new RestJvmHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/jvm", new RestJvmHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/thread_pool", new RestThreadPoolHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/thread_pool", new RestThreadPoolHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/network", new RestNetworkHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/network", new RestNetworkHandler());
@ -86,6 +89,7 @@ public class RestNodesInfoAction extends BaseRestHandler {
nodesInfoRequest.os(request.paramAsBoolean("os", nodesInfoRequest.os()));
nodesInfoRequest.process(request.paramAsBoolean("process", nodesInfoRequest.process()));
nodesInfoRequest.jvm(request.paramAsBoolean("jvm", nodesInfoRequest.jvm()));
nodesInfoRequest.threadPool(request.paramAsBoolean("thread_pool", nodesInfoRequest.threadPool()));
nodesInfoRequest.network(request.paramAsBoolean("network", nodesInfoRequest.network()));
nodesInfoRequest.transport(request.paramAsBoolean("transport", nodesInfoRequest.transport()));
nodesInfoRequest.http(request.paramAsBoolean("http", nodesInfoRequest.http()));
@ -158,6 +162,15 @@ public class RestNodesInfoAction extends BaseRestHandler {
}
}
class RestThreadPoolHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(RestActions.splitNodes(request.param("nodeId")));
nodesInfoRequest.clear().threadPool(true);
executeNodeRequest(request, channel, nodesInfoRequest);
}
}
class RestNetworkHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -58,6 +58,9 @@ public class RestNodesStatsAction extends BaseRestHandler {
controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/jvm", new RestJvmHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/jvm", new RestJvmHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/thread_pool", new RestThreadPoolHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/thread_pool", new RestThreadPoolHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/stats/network", new RestNetworkHandler());
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/stats/network", new RestNetworkHandler());
@ -80,6 +83,7 @@ public class RestNodesStatsAction extends BaseRestHandler {
nodesStatsRequest.os(request.paramAsBoolean("os", nodesStatsRequest.os()));
nodesStatsRequest.process(request.paramAsBoolean("process", nodesStatsRequest.process()));
nodesStatsRequest.jvm(request.paramAsBoolean("jvm", nodesStatsRequest.jvm()));
nodesStatsRequest.threadPool(request.paramAsBoolean("thread_pool", nodesStatsRequest.threadPool()));
nodesStatsRequest.network(request.paramAsBoolean("network", nodesStatsRequest.network()));
nodesStatsRequest.transport(request.paramAsBoolean("transport", nodesStatsRequest.transport()));
nodesStatsRequest.http(request.paramAsBoolean("http", nodesStatsRequest.http()));
@ -149,6 +153,15 @@ public class RestNodesStatsAction extends BaseRestHandler {
}
}
class RestThreadPoolHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(RestActions.splitNodes(request.param("nodeId")));
nodesStatsRequest.clear().threadPool(true);
executeNodeStats(request, channel, nodesStatsRequest);
}
}
class RestNetworkHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {

View File

@ -28,12 +28,20 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ -57,7 +65,7 @@ public class ThreadPool extends AbstractComponent {
public static final String SNAPSHOT = "snapshot";
}
private final ImmutableMap<String, Executor> executors;
private final ImmutableMap<String, ExecutorHolder> executors;
private final ScheduledThreadPoolExecutor scheduler;
@ -73,7 +81,7 @@ public class ThreadPool extends AbstractComponent {
Map<String, Settings> groupSettings = settings.getGroups("threadpool");
Map<String, Executor> executors = Maps.newHashMap();
Map<String, ExecutorHolder> executors = Maps.newHashMap();
executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build()));
executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
@ -81,7 +89,7 @@ public class ThreadPool extends AbstractComponent {
executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "5m").put("size", 20).build()));
executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "5m").put("size", 20).build()));
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SAME, MoreExecutors.sameThreadExecutor());
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same")));
this.executors = ImmutableMap.copyOf(executors);
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@ -96,12 +104,40 @@ public class ThreadPool extends AbstractComponent {
return estimatedTimeThread.estimatedTimeInMillis();
}
public ThreadPoolInfo info() {
List<Info> infos = new ArrayList<Info>();
for (ExecutorHolder holder : executors.values()) {
String name = holder.info.name();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
}
infos.add(holder.info);
}
return new ThreadPoolInfo(infos);
}
public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<ThreadPoolStats.Stats>();
for (ExecutorHolder holder : executors.values()) {
String name = holder.info.name();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
}
int threads = ((ThreadPoolExecutor) holder.executor).getPoolSize();
int queue = ((ThreadPoolExecutor) holder.executor).getQueue().size();
stats.add(new ThreadPoolStats.Stats(name, threads, queue));
}
return new ThreadPoolStats(stats);
}
public Executor cached() {
return executor(Names.CACHED);
}
public Executor executor(String name) {
Executor executor = executors.get(name);
Executor executor = executors.get(name).executor;
if (executor == null) {
throw new ElasticSearchIllegalArgumentException("No executor found for [" + name + "]");
}
@ -127,9 +163,9 @@ public class ThreadPool extends AbstractComponent {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
scheduler.shutdown();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdown();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor.executor).shutdown();
}
}
}
@ -138,24 +174,24 @@ public class ThreadPool extends AbstractComponent {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
scheduler.shutdownNow();
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor).shutdownNow();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor.executor).shutdownNow();
}
}
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
boolean result = scheduler.awaitTermination(timeout, unit);
for (Executor executor : executors.values()) {
if (executor instanceof ThreadPoolExecutor) {
result &= ((ThreadPoolExecutor) executor).awaitTermination(timeout, unit);
for (ExecutorHolder executor : executors.values()) {
if (executor.executor instanceof ThreadPoolExecutor) {
result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit);
}
}
return result;
}
private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) {
private ExecutorHolder build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) {
if (settings == null) {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
}
@ -163,17 +199,18 @@ public class ThreadPool extends AbstractComponent {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[" + name + "]");
if ("same".equals(type)) {
logger.debug("creating thread_pool [{}], type [{}]", name, type);
return MoreExecutors.sameThreadExecutor();
return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type));
} else if ("cached".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
keepAlive.millis(), TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5));
int queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", -1));
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", null)));
RejectedExecutionHandler rejectedExecutionHandler;
String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort"));
if ("abort".equals(rejectSetting)) {
@ -183,25 +220,28 @@ public class ThreadPool extends AbstractComponent {
} else {
throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool");
}
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, queueSize, rejectSetting);
return new ThreadPoolExecutor(size, size,
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, capacity, rejectSetting);
Executor executor = new ThreadPoolExecutor(size, size,
0L, TimeUnit.MILLISECONDS,
queueSize <= 0 ? new LinkedTransferQueue<Runnable>() : new ArrayBlockingQueue<Runnable>(queueSize),
capacity == null ? new LinkedTransferQueue<Runnable>() : new ArrayBlockingQueue<Runnable>((int) capacity.singles()),
threadFactory, rejectedExecutionHandler);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity));
} else if ("scaling".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
return EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null));
} else if ("blocking".equals(type)) {
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)));
int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1));
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)));
SizeValue capacity = settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000)));
SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000))));
TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60)));
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime);
return EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity));
}
throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
@ -307,4 +347,151 @@ public class ThreadPool extends AbstractComponent {
}
}
}
static class ExecutorHolder {
public final Executor executor;
public final Info info;
ExecutorHolder(Executor executor, Info info) {
this.executor = executor;
this.info = info;
}
}
public static class Info implements Streamable, ToXContent {
private String name;
private String type;
private int min;
private int max;
private TimeValue keepAlive;
private SizeValue capacity;
Info() {
}
public Info(String name, String type) {
this(name, type, -1);
}
public Info(String name, String type, int size) {
this(name, type, size, size, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) {
this.name = name;
this.type = type;
this.min = min;
this.max = max;
this.keepAlive = keepAlive;
this.capacity = capacity;
}
public String name() {
return this.name;
}
public String getName() {
return this.name;
}
public String type() {
return this.type;
}
public String getType() {
return this.type;
}
public int min() {
return this.min;
}
public int getMin() {
return this.min;
}
public int max() {
return this.max;
}
public int getMax() {
return this.max;
}
@Nullable
public TimeValue keepAlive() {
return this.keepAlive;
}
@Nullable
public TimeValue getKeepAlive() {
return this.keepAlive;
}
@Nullable
public SizeValue capacity() {
return this.capacity;
}
@Nullable
public SizeValue getCapacity() {
return this.capacity;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
type = in.readUTF();
min = in.readInt();
max = in.readInt();
if (in.readBoolean()) {
keepAlive = TimeValue.readTimeValue(in);
}
if (in.readBoolean()) {
capacity = SizeValue.readSizeValue(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(type);
out.writeInt(min);
out.writeInt(max);
if (keepAlive == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
keepAlive.writeTo(out);
}
if (capacity == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
capacity.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE);
builder.field("type", type);
if (min != -1) {
builder.field("min", min);
}
if (max != -1) {
builder.field("max", max);
}
if (keepAlive != null) {
builder.field("keep_alive", keepAlive.toString());
}
if (capacity != null) {
builder.field("capacity", capacity.toString());
}
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
*/
public class ThreadPoolInfo implements Streamable, Iterable<ThreadPool.Info>, ToXContent {
private List<ThreadPool.Info> infos;
ThreadPoolInfo() {
}
public ThreadPoolInfo(List<ThreadPool.Info> infos) {
this.infos = infos;
}
@Override
public Iterator<ThreadPool.Info> iterator() {
return infos.iterator();
}
public static ThreadPoolInfo readThreadPoolInfo(StreamInput in) throws IOException {
ThreadPoolInfo info = new ThreadPoolInfo();
info.readFrom(in);
return info;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
infos = new ArrayList<ThreadPool.Info>(size);
for (int i = 0; i < size; i++) {
ThreadPool.Info info = new ThreadPool.Info();
info.readFrom(in);
infos.add(info);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(infos.size());
for (ThreadPool.Info info : infos) {
info.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("thread_pool");
for (ThreadPool.Info info : infos) {
info.toXContent(builder, params);
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
*/
public class ThreadPoolStats implements Streamable, ToXContent, Iterable<ThreadPoolStats.Stats> {
public static class Stats implements Streamable, ToXContent {
private String name;
private int threads;
private int queue;
Stats() {
}
public Stats(String name, int threads, int queue) {
this.name = name;
this.threads = threads;
this.queue = queue;
}
public String name() {
return this.name;
}
public String getName() {
return this.name;
}
public int threads() {
return this.threads;
}
public int getThreads() {
return this.threads;
}
public int queue() {
return this.queue;
}
public int getQueue() {
return this.queue;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
threads = in.readInt();
queue = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(threads);
out.writeInt(queue);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE);
if (threads != -1) {
builder.field("threads", threads);
}
if (queue != -1) {
builder.field("queue", queue);
}
builder.endObject();
return builder;
}
}
private List<Stats> stats;
ThreadPoolStats() {
}
public ThreadPoolStats(List<Stats> stats) {
this.stats = stats;
}
@Override
public Iterator<Stats> iterator() {
return stats.iterator();
}
public static ThreadPoolStats readThreadPoolStats(StreamInput in) throws IOException {
ThreadPoolStats stats = new ThreadPoolStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
stats = new ArrayList<Stats>(size);
for (int i = 0; i < size; i++) {
Stats stats1 = new Stats();
stats1.readFrom(in);
stats.add(stats1);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(stats.size());
for (Stats stat : stats) {
stat.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("thread_pool");
for (Stats stat : stats) {
stat.toXContent(builder, params);
}
builder.endObject();
return builder;
}
}