From 9d979dfc015ee8de43a1080d817f48a1583b8ae9 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 10 Jan 2012 17:45:10 +0200 Subject: [PATCH] Add thread_pool to nodes info and nodes stats APIs, closes #1601. --- .../admin/cluster/node/info/NodeInfo.java | 26 +- .../cluster/node/info/NodesInfoRequest.java | 19 ++ .../cluster/node/info/NodesInfoResponse.java | 3 + .../node/info/TransportNodesInfoAction.java | 2 +- .../admin/cluster/node/stats/NodeStats.java | 32 ++- .../cluster/node/stats/NodesStatsRequest.java | 19 ++ .../node/stats/NodesStatsResponse.java | 3 + .../node/stats/TransportNodesStatsAction.java | 2 +- .../node/info/NodesInfoRequestBuilder.java | 8 + .../node/stats/NodesStatsRequestBuilder.java | 8 + .../node/service/NodeService.java | 43 +++- .../node/info/RestNodesInfoAction.java | 13 + .../node/stats/RestNodesStatsAction.java | 13 + .../elasticsearch/threadpool/ThreadPool.java | 233 ++++++++++++++++-- .../threadpool/ThreadPoolInfo.java | 86 +++++++ .../threadpool/ThreadPoolStats.java | 154 ++++++++++++ 16 files changed, 625 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java create mode 100644 src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index f282d2d47d1..b2e81f26950 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -32,6 +32,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.network.NetworkInfo; import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; +import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.transport.TransportInfo; import java.io.IOException; @@ -60,6 +61,9 @@ public class NodeInfo extends NodeOperationResponse { @Nullable private JvmInfo jvm; + @Nullable + private ThreadPoolInfo threadPool; + @Nullable private NetworkInfo network; @@ -73,7 +77,7 @@ public class NodeInfo extends NodeOperationResponse { } public NodeInfo(@Nullable String hostname, DiscoveryNode node, @Nullable ImmutableMap serviceAttributes, @Nullable Settings settings, - @Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable NetworkInfo network, + @Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool, @Nullable NetworkInfo network, @Nullable TransportInfo transport, @Nullable HttpInfo http) { super(node); this.hostname = hostname; @@ -82,6 +86,7 @@ public class NodeInfo extends NodeOperationResponse { this.os = os; this.process = process; this.jvm = jvm; + this.threadPool = threadPool; this.network = network; this.transport = transport; this.http = http; @@ -183,6 +188,16 @@ public class NodeInfo extends NodeOperationResponse { return jvm(); } + @Nullable + public ThreadPoolInfo threadPool() { + return this.threadPool; + } + + @Nullable + public ThreadPoolInfo getThreadPool() { + return threadPool(); + } + /** * Network level information. */ @@ -251,6 +266,9 @@ public class NodeInfo extends NodeOperationResponse { if (in.readBoolean()) { jvm = JvmInfo.readJvmInfo(in); } + if (in.readBoolean()) { + threadPool = ThreadPoolInfo.readThreadPoolInfo(in); + } if (in.readBoolean()) { network = NetworkInfo.readNetworkInfo(in); } @@ -305,6 +323,12 @@ public class NodeInfo extends NodeOperationResponse { out.writeBoolean(true); jvm.writeTo(out); } + if (threadPool == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + threadPool.writeTo(out); + } if (network == null) { out.writeBoolean(false); } else { diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java index fdd033ace03..5c882e8743e 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -34,6 +34,7 @@ public class NodesInfoRequest extends NodesOperationRequest { private boolean os = false; private boolean process = false; private boolean jvm = false; + private boolean threadPool = false; private boolean network = false; private boolean transport = false; private boolean http = false; @@ -57,6 +58,7 @@ public class NodesInfoRequest extends NodesOperationRequest { os = false; process = false; jvm = false; + threadPool = false; network = false; transport = false; http = false; @@ -123,6 +125,21 @@ public class NodesInfoRequest extends NodesOperationRequest { return this; } + /** + * Should the node Thread Pool info be returned. + */ + public boolean threadPool() { + return this.threadPool; + } + + /** + * Should the node Thread Pool info be returned. + */ + public NodesInfoRequest threadPool(boolean threadPool) { + this.threadPool = threadPool; + return this; + } + /** * Should the node Network be returned. */ @@ -175,6 +192,7 @@ public class NodesInfoRequest extends NodesOperationRequest { os = in.readBoolean(); process = in.readBoolean(); jvm = in.readBoolean(); + threadPool = in.readBoolean(); network = in.readBoolean(); transport = in.readBoolean(); http = in.readBoolean(); @@ -187,6 +205,7 @@ public class NodesInfoRequest extends NodesOperationRequest { out.writeBoolean(os); out.writeBoolean(process); out.writeBoolean(jvm); + out.writeBoolean(threadPool); out.writeBoolean(network); out.writeBoolean(transport); out.writeBoolean(http); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java index 8b6fc920eb0..86d3d42a5e5 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -116,6 +116,9 @@ public class NodesInfoResponse extends NodesOperationResponse implemen if (nodeInfo.jvm() != null) { nodeInfo.jvm().toXContent(builder, params); } + if (nodeInfo.threadPool() != null) { + nodeInfo.threadPool().toXContent(builder, params); + } if (nodeInfo.network() != null) { nodeInfo.network().toXContent(builder, params); } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index ddca44262d5..b5f6d8a1341 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -98,7 +98,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction implem if (nodeStats.jvm() != null) { nodeStats.jvm().toXContent(builder, params); } + if (nodeStats.threadPool() != null) { + nodeStats.threadPool().toXContent(builder, params); + } if (nodeStats.network() != null) { nodeStats.network().toXContent(builder, params); } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 3ccb8a5dc65..e93f595544f 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -98,7 +98,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction executors; + private final ImmutableMap executors; private final ScheduledThreadPoolExecutor scheduler; @@ -73,7 +81,7 @@ public class ThreadPool extends AbstractComponent { Map groupSettings = settings.getGroups("threadpool"); - Map executors = Maps.newHashMap(); + Map executors = Maps.newHashMap(); executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build())); executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS)); @@ -81,7 +89,7 @@ public class ThreadPool extends AbstractComponent { executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "5m").put("size", 20).build())); executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "5m").put("size", 20).build())); executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS)); - executors.put(Names.SAME, MoreExecutors.sameThreadExecutor()); + executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same"))); this.executors = ImmutableMap.copyOf(executors); this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]")); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -96,12 +104,40 @@ public class ThreadPool extends AbstractComponent { return estimatedTimeThread.estimatedTimeInMillis(); } + public ThreadPoolInfo info() { + List infos = new ArrayList(); + for (ExecutorHolder holder : executors.values()) { + String name = holder.info.name(); + // no need to have info on "same" thread pool + if ("same".equals(name)) { + continue; + } + infos.add(holder.info); + } + return new ThreadPoolInfo(infos); + } + + public ThreadPoolStats stats() { + List stats = new ArrayList(); + for (ExecutorHolder holder : executors.values()) { + String name = holder.info.name(); + // no need to have info on "same" thread pool + if ("same".equals(name)) { + continue; + } + int threads = ((ThreadPoolExecutor) holder.executor).getPoolSize(); + int queue = ((ThreadPoolExecutor) holder.executor).getQueue().size(); + stats.add(new ThreadPoolStats.Stats(name, threads, queue)); + } + return new ThreadPoolStats(stats); + } + public Executor cached() { return executor(Names.CACHED); } public Executor executor(String name) { - Executor executor = executors.get(name); + Executor executor = executors.get(name).executor; if (executor == null) { throw new ElasticSearchIllegalArgumentException("No executor found for [" + name + "]"); } @@ -127,9 +163,9 @@ public class ThreadPool extends AbstractComponent { estimatedTimeThread.running = false; estimatedTimeThread.interrupt(); scheduler.shutdown(); - for (Executor executor : executors.values()) { - if (executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor).shutdown(); + for (ExecutorHolder executor : executors.values()) { + if (executor.executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor.executor).shutdown(); } } } @@ -138,24 +174,24 @@ public class ThreadPool extends AbstractComponent { estimatedTimeThread.running = false; estimatedTimeThread.interrupt(); scheduler.shutdownNow(); - for (Executor executor : executors.values()) { - if (executor instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor).shutdownNow(); + for (ExecutorHolder executor : executors.values()) { + if (executor.executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor) executor.executor).shutdownNow(); } } } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { boolean result = scheduler.awaitTermination(timeout, unit); - for (Executor executor : executors.values()) { - if (executor instanceof ThreadPoolExecutor) { - result &= ((ThreadPoolExecutor) executor).awaitTermination(timeout, unit); + for (ExecutorHolder executor : executors.values()) { + if (executor.executor instanceof ThreadPoolExecutor) { + result &= ((ThreadPoolExecutor) executor.executor).awaitTermination(timeout, unit); } } return result; } - private Executor build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { + private ExecutorHolder build(String name, String defaultType, @Nullable Settings settings, Settings defaultSettings) { if (settings == null) { settings = ImmutableSettings.Builder.EMPTY_SETTINGS; } @@ -163,17 +199,18 @@ public class ThreadPool extends AbstractComponent { ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[" + name + "]"); if ("same".equals(type)) { logger.debug("creating thread_pool [{}], type [{}]", name, type); - return MoreExecutors.sameThreadExecutor(); + return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type)); } else if ("cached".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + Executor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, keepAlive.millis(), TimeUnit.MILLISECONDS, new SynchronousQueue(), threadFactory); + return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int size = settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5)); - int queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", -1)); + SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", null))); RejectedExecutionHandler rejectedExecutionHandler; String rejectSetting = settings.get("reject_policy", defaultSettings.get("reject_policy", "abort")); if ("abort".equals(rejectSetting)) { @@ -183,25 +220,28 @@ public class ThreadPool extends AbstractComponent { } else { throw new ElasticSearchIllegalArgumentException("reject_policy [" + rejectSetting + "] not valid for [" + name + "] thread pool"); } - logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, queueSize, rejectSetting); - return new ThreadPoolExecutor(size, size, + logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}], reject_policy [{}]", name, type, size, capacity, rejectSetting); + Executor executor = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, - queueSize <= 0 ? new LinkedTransferQueue() : new ArrayBlockingQueue(queueSize), + capacity == null ? new LinkedTransferQueue() : new ArrayBlockingQueue((int) capacity.singles()), threadFactory, rejectedExecutionHandler); + return new ExecutorHolder(executor, new Info(name, type, size, size, null, capacity)); } else if ("scaling".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); - return EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newScalingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); } else if ("blocking".equals(type)) { TimeValue keepAlive = settings.getAsTime("keep_alive", defaultSettings.getAsTime("keep_alive", timeValueMinutes(5))); int min = settings.getAsInt("min", defaultSettings.getAsInt("min", 1)); int size = settings.getAsInt("max", settings.getAsInt("size", defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5))); - SizeValue capacity = settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000))); + SizeValue capacity = settings.getAsSize("capacity", settings.getAsSize("queue_size", defaultSettings.getAsSize("queue_size", new SizeValue(1000)))); TimeValue waitTime = settings.getAsTime("wait_time", defaultSettings.getAsTime("wait_time", timeValueSeconds(60))); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], queue_size [{}], keep_alive [{}], wait_time [{}]", name, type, min, size, capacity.singles(), keepAlive, waitTime); - return EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); + Executor executor = EsExecutors.newBlockingExecutorService(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, (int) capacity.singles(), waitTime.millis(), TimeUnit.MILLISECONDS); + return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, capacity)); } throw new ElasticSearchIllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } @@ -307,4 +347,151 @@ public class ThreadPool extends AbstractComponent { } } } + + static class ExecutorHolder { + public final Executor executor; + public final Info info; + + ExecutorHolder(Executor executor, Info info) { + this.executor = executor; + this.info = info; + } + } + + public static class Info implements Streamable, ToXContent { + + private String name; + private String type; + private int min; + private int max; + private TimeValue keepAlive; + private SizeValue capacity; + + Info() { + + } + + public Info(String name, String type) { + this(name, type, -1); + } + + public Info(String name, String type, int size) { + this(name, type, size, size, null, null); + } + + public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue capacity) { + this.name = name; + this.type = type; + this.min = min; + this.max = max; + this.keepAlive = keepAlive; + this.capacity = capacity; + } + + public String name() { + return this.name; + } + + public String getName() { + return this.name; + } + + public String type() { + return this.type; + } + + public String getType() { + return this.type; + } + + public int min() { + return this.min; + } + + public int getMin() { + return this.min; + } + + public int max() { + return this.max; + } + + public int getMax() { + return this.max; + } + + @Nullable + public TimeValue keepAlive() { + return this.keepAlive; + } + + @Nullable + public TimeValue getKeepAlive() { + return this.keepAlive; + } + + @Nullable + public SizeValue capacity() { + return this.capacity; + } + + @Nullable + public SizeValue getCapacity() { + return this.capacity; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + type = in.readUTF(); + min = in.readInt(); + max = in.readInt(); + if (in.readBoolean()) { + keepAlive = TimeValue.readTimeValue(in); + } + if (in.readBoolean()) { + capacity = SizeValue.readSizeValue(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeUTF(type); + out.writeInt(min); + out.writeInt(max); + if (keepAlive == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + keepAlive.writeTo(out); + } + if (capacity == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + capacity.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE); + builder.field("type", type); + if (min != -1) { + builder.field("min", min); + } + if (max != -1) { + builder.field("max", max); + } + if (keepAlive != null) { + builder.field("keep_alive", keepAlive.toString()); + } + if (capacity != null) { + builder.field("capacity", capacity.toString()); + } + builder.endObject(); + return builder; + } + } } diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java b/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java new file mode 100644 index 00000000000..dda8b25d4a0 --- /dev/null +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPoolInfo.java @@ -0,0 +1,86 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + */ +public class ThreadPoolInfo implements Streamable, Iterable, ToXContent { + + private List infos; + + ThreadPoolInfo() { + } + + + public ThreadPoolInfo(List infos) { + this.infos = infos; + } + + @Override + public Iterator iterator() { + return infos.iterator(); + } + + public static ThreadPoolInfo readThreadPoolInfo(StreamInput in) throws IOException { + ThreadPoolInfo info = new ThreadPoolInfo(); + info.readFrom(in); + return info; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + infos = new ArrayList(size); + for (int i = 0; i < size; i++) { + ThreadPool.Info info = new ThreadPool.Info(); + info.readFrom(in); + infos.add(info); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(infos.size()); + for (ThreadPool.Info info : infos) { + info.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("thread_pool"); + for (ThreadPool.Info info : infos) { + info.toXContent(builder, params); + } + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java b/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java new file mode 100644 index 00000000000..a9476cfd1cd --- /dev/null +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPoolStats.java @@ -0,0 +1,154 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + */ +public class ThreadPoolStats implements Streamable, ToXContent, Iterable { + + public static class Stats implements Streamable, ToXContent { + + private String name; + private int threads; + private int queue; + + Stats() { + + } + + public Stats(String name, int threads, int queue) { + this.name = name; + this.threads = threads; + this.queue = queue; + } + + public String name() { + return this.name; + } + + public String getName() { + return this.name; + } + + public int threads() { + return this.threads; + } + + public int getThreads() { + return this.threads; + } + + public int queue() { + return this.queue; + } + + public int getQueue() { + return this.queue; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + threads = in.readInt(); + queue = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeInt(threads); + out.writeInt(queue); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE); + if (threads != -1) { + builder.field("threads", threads); + } + if (queue != -1) { + builder.field("queue", queue); + } + builder.endObject(); + return builder; + } + } + + private List stats; + + ThreadPoolStats() { + + } + + public ThreadPoolStats(List stats) { + this.stats = stats; + } + + @Override + public Iterator iterator() { + return stats.iterator(); + } + + public static ThreadPoolStats readThreadPoolStats(StreamInput in) throws IOException { + ThreadPoolStats stats = new ThreadPoolStats(); + stats.readFrom(in); + return stats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + stats = new ArrayList(size); + for (int i = 0; i < size; i++) { + Stats stats1 = new Stats(); + stats1.readFrom(in); + stats.add(stats1); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(stats.size()); + for (Stats stat : stats) { + stat.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("thread_pool"); + for (Stats stat : stats) { + stat.toXContent(builder, params); + } + builder.endObject(); + return builder; + } +}