Cluster Nodes hot_threads API, closes #2134.

This commit is contained in:
Shay Banon 2012-08-02 14:52:31 +03:00
parent e26a56e025
commit 8be5c72200
13 changed files with 826 additions and 0 deletions

View File

@ -22,6 +22,8 @@ package org.elasticsearch.action;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartAction; import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartAction;
@ -159,6 +161,7 @@ public class ActionModule extends AbstractModule {
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesShutdownAction.INSTANCE, TransportNodesShutdownAction.class); registerAction(NodesShutdownAction.INSTANCE, TransportNodesShutdownAction.class);
registerAction(NodesRestartAction.INSTANCE, TransportNodesRestartAction.class); registerAction(NodesRestartAction.INSTANCE, TransportNodesRestartAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class NodeHotThreads extends NodeOperationResponse {
private String hotThreads;
NodeHotThreads() {
}
public NodeHotThreads(DiscoveryNode node, String hotThreads) {
super(node);
this.hotThreads = hotThreads;
}
public String hotThreads() {
return this.hotThreads;
}
public static NodeHotThreads readNodeHotThreads(StreamInput in) throws IOException {
NodeHotThreads node = new NodeHotThreads();
node.readFrom(in);
return node;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
hotThreads = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(hotThreads);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.client.ClusterAdminClient;
/**
*/
public class NodesHotThreadsAction extends ClusterAction<NodesHotThreadsRequest, NodesHotThreadsResponse, NodesHotThreadsRequestBuilder> {
public static final NodesHotThreadsAction INSTANCE = new NodesHotThreadsAction();
public static final String NAME = "cluster/nodes/hot_threads";
private NodesHotThreadsAction() {
super(NAME);
}
@Override
public NodesHotThreadsResponse newResponse() {
return new NodesHotThreadsResponse();
}
@Override
public NodesHotThreadsRequestBuilder newRequestBuilder(ClusterAdminClient client) {
return new NodesHotThreadsRequestBuilder(client);
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
*/
public class NodesHotThreadsRequest extends NodesOperationRequest {
int threads = 3;
String type = "cpu";
TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
/**
* Get hot threads from nodes based on the nodes ids specified. If none are passed, hot
* threads for all nodes is used.
*/
public NodesHotThreadsRequest(String... nodesIds) {
super(nodesIds);
}
public int threads() {
return this.threads;
}
public NodesHotThreadsRequest threads(int threads) {
this.threads = threads;
return this;
}
public NodesHotThreadsRequest type(String type) {
this.type = type;
return this;
}
public String type() {
return this.type;
}
public NodesHotThreadsRequest interval(TimeValue interval) {
this.interval = interval;
return this;
}
public TimeValue interval() {
return this.interval;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
threads = in.readInt();
type = in.readString();
interval = TimeValue.readTimeValue(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(threads);
out.writeString(type);
interval.writeTo(out);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.support.BaseClusterRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.common.unit.TimeValue;
/**
*/
public class NodesHotThreadsRequestBuilder extends BaseClusterRequestBuilder<NodesHotThreadsRequest, NodesHotThreadsResponse> {
public NodesHotThreadsRequestBuilder(ClusterAdminClient clusterClient) {
super(clusterClient, new NodesHotThreadsRequest());
}
public NodesHotThreadsRequestBuilder setNodesIds(String... nodesIds) {
request.nodesIds(nodesIds);
return this;
}
public NodesHotThreadsRequestBuilder setThreads(int threads) {
request.threads(threads);
return this;
}
public NodesHotThreadsRequestBuilder setType(String type) {
request.type(type);
return this;
}
public NodesHotThreadsRequestBuilder setInterval(TimeValue interval) {
request.interval(interval);
return this;
}
@Override
protected void doExecute(ActionListener<NodesHotThreadsResponse> listener) {
client.nodesHotThreads(request, listener);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.support.nodes.NodesOperationResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class NodesHotThreadsResponse extends NodesOperationResponse<NodeHotThreads> {
NodesHotThreadsResponse() {
}
public NodesHotThreadsResponse(ClusterName clusterName, NodeHotThreads[] nodes) {
super(clusterName, nodes);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeHotThreads[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeHotThreads.readNodeHotThreads(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeHotThreads node : nodes) {
node.writeTo(out);
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.hotthreads;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.support.nodes.NodeOperationRequest;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.jvm.HotThreads;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
*/
public class TransportNodesHotThreadsAction extends TransportNodesOperationAction<NodesHotThreadsRequest, NodesHotThreadsResponse, TransportNodesHotThreadsAction.NodeRequest, NodeHotThreads> {
@Inject
public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings, clusterName, threadPool, clusterService, transportService);
}
@Override
protected String executor() {
return ThreadPool.Names.CACHE;
}
@Override
protected String transportAction() {
return NodesHotThreadsAction.NAME;
}
@Override
protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request, AtomicReferenceArray responses) {
final List<NodeHotThreads> nodes = Lists.newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeHotThreads) {
nodes.add((NodeHotThreads) resp);
}
}
return new NodesHotThreadsResponse(clusterName, nodes.toArray(new NodeHotThreads[nodes.size()]));
}
@Override
protected NodesHotThreadsRequest newRequest() {
return new NodesHotThreadsRequest();
}
@Override
protected NodeRequest newNodeRequest() {
return new NodeRequest();
}
@Override
protected NodeRequest newNodeRequest(String nodeId, NodesHotThreadsRequest request) {
return new NodeRequest(nodeId, request);
}
@Override
protected NodeHotThreads newNodeResponse() {
return new NodeHotThreads();
}
@Override
protected NodeHotThreads nodeOperation(NodeRequest request) throws ElasticSearchException {
HotThreads hotThreads = new HotThreads()
.busiestThreads(request.request.threads)
.type(request.request.type)
.interval(request.request.interval);
try {
return new NodeHotThreads(clusterService.state().nodes().localNode(), hotThreads.detect());
} catch (Exception e) {
throw new ElasticSearchException("failed to detect hot threads", e);
}
}
@Override
protected boolean accumulateExceptions() {
return false;
}
static class NodeRequest extends NodeOperationRequest {
NodesHotThreadsRequest request;
NodeRequest() {
}
NodeRequest(String nodeId, NodesHotThreadsRequest request) {
super(nodeId);
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new NodesHotThreadsRequest();
request.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
}

View File

@ -24,6 +24,9 @@ import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -181,6 +184,12 @@ public interface ClusterAdminClient {
*/ */
NodesStatsRequestBuilder prepareNodesStats(String... nodesIds); NodesStatsRequestBuilder prepareNodesStats(String... nodesIds);
ActionFuture<NodesHotThreadsResponse> nodesHotThreads(NodesHotThreadsRequest request);
void nodesHotThreads(NodesHotThreadsRequest request, ActionListener<NodesHotThreadsResponse> listener);
NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds);
/** /**
* Shutdown nodes in the cluster. * Shutdown nodes in the cluster.
* *

View File

@ -25,6 +25,10 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
@ -155,6 +159,21 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin
return new NodesStatsRequestBuilder(this).setNodesIds(nodesIds); return new NodesStatsRequestBuilder(this).setNodesIds(nodesIds);
} }
@Override
public ActionFuture<NodesHotThreadsResponse> nodesHotThreads(NodesHotThreadsRequest request) {
return execute(NodesHotThreadsAction.INSTANCE, request);
}
@Override
public void nodesHotThreads(NodesHotThreadsRequest request, ActionListener<NodesHotThreadsResponse> listener) {
execute(NodesHotThreadsAction.INSTANCE, request, listener);
}
@Override
public NodesHotThreadsRequestBuilder prepareNodesHotThreads(String... nodesIds) {
return new NodesHotThreadsRequestBuilder(this).setNodesIds(nodesIds);
}
@Override @Override
public ActionFuture<NodesRestartResponse> nodesRestart(final NodesRestartRequest request) { public ActionFuture<NodesRestartResponse> nodesRestart(final NodesRestartRequest request) {
return execute(NodesRestartAction.INSTANCE, request); return execute(NodesRestartAction.INSTANCE, request);

View File

@ -22,7 +22,9 @@ package org.elasticsearch.common;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import gnu.trove.set.hash.THashSet; import gnu.trove.set.hash.THashSet;
import org.elasticsearch.common.io.FastStringReader;
import java.io.BufferedReader;
import java.util.*; import java.util.*;
/** /**
@ -42,6 +44,27 @@ public class Strings {
private static final char EXTENSION_SEPARATOR = '.'; private static final char EXTENSION_SEPARATOR = '.';
public static void tabify(int tabs, String from, StringBuilder to) throws Exception {
BufferedReader reader = new BufferedReader(new FastStringReader(from));
String line;
while ((line = reader.readLine()) != null) {
for (int i = 0; i < tabs; i++) {
to.append('\t');
}
to.append(line).append('\n');
}
}
public static void spaceify(int spaces, String from, StringBuilder to) throws Exception {
BufferedReader reader = new BufferedReader(new FastStringReader(from));
String line;
while ((line = reader.readLine()) != null) {
for (int i = 0; i < spaces; i++) {
to.append(' ');
}
to.append(line).append('\n');
}
}
/** /**
* Splits a backslash escaped string on the separator. * Splits a backslash escaped string on the separator.

View File

@ -0,0 +1,233 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.jvm;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.TimeValue;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
*/
public class HotThreads {
private int busiestThreads = 3;
private TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
private TimeValue threadElementsSnapshotDelay = new TimeValue(10);
private int threadElementsSnapshotCount = 10;
private String type = "cpu";
public HotThreads interval(TimeValue interval) {
this.interval = interval;
return this;
}
public HotThreads busiestThreads(int busiestThreads) {
this.busiestThreads = busiestThreads;
return this;
}
public HotThreads threadElementsSnapshotDelay(TimeValue threadElementsSnapshotDelay) {
this.threadElementsSnapshotDelay = threadElementsSnapshotDelay;
return this;
}
public HotThreads threadElementsSnapshotCount(int threadElementsSnapshotCount) {
this.threadElementsSnapshotCount = threadElementsSnapshotCount;
return this;
}
public HotThreads type(String type) {
if ("cpu".equals(type) || "wait".equals(type) || "block".equals(type)) {
this.type = type;
} else {
throw new ElasticSearchIllegalArgumentException("type not supported [" + type + "]");
}
return this;
}
public String detect() throws Exception {
StringBuilder sb = new StringBuilder();
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
if (threadBean.isThreadCpuTimeSupported()) {
if (!threadBean.isThreadCpuTimeEnabled()) threadBean.setThreadCpuTimeEnabled(true);
} else {
throw new IllegalStateException("MBean doesn't support thread CPU Time");
}
Map<Long, MyThreadInfo> threadInfos = new HashMap<Long, MyThreadInfo>();
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
threadInfos.put(threadId, new MyThreadInfo(cpu, info));
}
Thread.sleep(interval.millis());
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
MyThreadInfo data = threadInfos.get(threadId);
if (data != null) {
data.setDelta(cpu, info);
}
}
// sort by delta CPU time on thread.
List<MyThreadInfo> hotties = new ArrayList<MyThreadInfo>(threadInfos.values());
// skip that for now
Collections.sort(hotties, new Comparator<MyThreadInfo>() {
public int compare(MyThreadInfo o1, MyThreadInfo o2) {
if ("cpu".equals(type)) {
return (int) (o2.cpuTime - o1.cpuTime);
} else if ("wait".equals(type)) {
return (int) (o2.waitedTime - o1.waitedTime);
} else if ("block".equals(type)) {
return (int) (o2.blockedTime - o1.blockedTime);
}
throw new IllegalArgumentException();
}
});
// for(MyThreadInfo inf : hotties) {
// if(inf.deltaDone) {
// System.out.format("%5.2f %d/%d %d/%d %s%n",
// inf.cpuTime/1E7,
// inf.blockedCount,
// inf.blockedTime,
// inf.waitedCount,
// inf.waitedtime,
// inf.info.getThreadName()
// );
// }
// }
// analyse N stack traces for M busiest threads
long[] ids = new long[busiestThreads];
for (int i = 0; i < busiestThreads; i++) {
MyThreadInfo info = hotties.get(i);
ids[i] = info.info.getThreadId();
}
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
for (int j = 0; j < threadElementsSnapshotCount; j++) {
allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
Thread.sleep(threadElementsSnapshotDelay.millis());
}
for (int t = 0; t < busiestThreads; t++) {
double value = -1;
if ("cpu".equals(type)) {
value = hotties.get(t).cpuTime / 1E7;
} else if ("wait".equals(type)) {
value = hotties.get(t).waitedTime / 1E7;
} else if ("block".equals(type)) {
value = hotties.get(t).blockedTime / 1E7;
}
sb.append(String.format("%n%4.1f%% %s usage by thread '%s'%n", value, type, allInfos[0][t].getThreadName()));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
for (int i = 0; i < threadElementsSnapshotCount; i++) {
if (done[i]) continue;
int maxSim = 1;
boolean[] similars = new boolean[threadElementsSnapshotCount];
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (done[j]) continue;
int similarity = similarity(allInfos[i][t], allInfos[j][t]);
if (similarity > maxSim) {
maxSim = similarity;
similars = new boolean[threadElementsSnapshotCount];
}
if (similarity == maxSim) similars[j] = true;
}
// print out trace maxSim levels of i, and mark similar ones as done
int count = 1;
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (similars[j]) {
done[j] = true;
count++;
}
}
StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(" unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
}
} else {
sb.append(String.format(" %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
}
}
}
}
return sb.toString();
}
private int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) {
StackTraceElement[] s1 = threadInfo.getStackTrace();
StackTraceElement[] s2 = threadInfo0.getStackTrace();
int i = s1.length - 1;
int j = s2.length - 1;
int rslt = 0;
while (i >= 0 && j >= 0 && s1[i].equals(s2[j])) {
rslt++;
i--;
j--;
}
return rslt;
}
class MyThreadInfo {
long cpuTime;
long blockedCount;
long blockedTime;
long waitedCount;
long waitedTime;
boolean deltaDone;
ThreadInfo info;
MyThreadInfo(long cpuTime, ThreadInfo info) {
blockedCount = info.getBlockedCount();
blockedTime = info.getBlockedTime();
waitedCount = info.getWaitedCount();
waitedTime = info.getWaitedTime();
this.cpuTime = cpuTime;
}
void setDelta(long cpuTime, ThreadInfo info) {
if (deltaDone) throw new IllegalStateException("setDelta already called once");
blockedCount = info.getBlockedCount() - blockedCount;
blockedTime = info.getBlockedTime() - blockedTime;
waitedCount = info.getWaitedCount() - waitedCount;
waitedTime = info.getWaitedTime() - waitedTime;
this.cpuTime = cpuTime - this.cpuTime;
deltaDone = true;
this.info = info;
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction; import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.hotthreads.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.node.restart.RestNodesRestartAction; import org.elasticsearch.rest.action.admin.cluster.node.restart.RestNodesRestartAction;
import org.elasticsearch.rest.action.admin.cluster.node.shutdown.RestNodesShutdownAction; import org.elasticsearch.rest.action.admin.cluster.node.shutdown.RestNodesShutdownAction;
@ -97,6 +98,7 @@ public class RestActionModule extends AbstractModule {
bind(RestNodesInfoAction.class).asEagerSingleton(); bind(RestNodesInfoAction.class).asEagerSingleton();
bind(RestNodesStatsAction.class).asEagerSingleton(); bind(RestNodesStatsAction.class).asEagerSingleton();
bind(RestNodesHotThreadsAction.class).asEagerSingleton();
bind(RestNodesShutdownAction.class).asEagerSingleton(); bind(RestNodesShutdownAction.class).asEagerSingleton();
bind(RestNodesRestartAction.class).asEagerSingleton(); bind(RestNodesRestartAction.class).asEagerSingleton();
bind(RestClusterStateAction.class).asEagerSingleton(); bind(RestClusterStateAction.class).asEagerSingleton();

View File

@ -0,0 +1,87 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.node.hotthreads;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
/**
*/
public class RestNodesHotThreadsAction extends BaseRestHandler {
@Inject
public RestNodesHotThreadsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hot_threads", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
String[] nodesIds = RestActions.splitNodes(request.param("nodeId"));
NodesHotThreadsRequest nodesHotThreadsRequest = new NodesHotThreadsRequest(nodesIds);
nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads()));
nodesHotThreadsRequest.type(request.param("type", nodesHotThreadsRequest.type()));
nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval()));
client.admin().cluster().nodesHotThreads(nodesHotThreadsRequest, new ActionListener<NodesHotThreadsResponse>() {
@Override
public void onResponse(NodesHotThreadsResponse response) {
try {
StringBuilder sb = new StringBuilder();
for (NodeHotThreads node : response) {
sb.append("::: ").append(node.node().toString()).append("\n");
Strings.spaceify(3, node.hotThreads(), sb);
sb.append('\n');
}
channel.sendResponse(new StringRestResponse(RestStatus.OK, sb.toString()));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}