expose snapshots parameter

This commit is contained in:
Shay Banon 2012-08-02 15:24:25 +03:00
parent 8be5c72200
commit de3fb50c3b
4 changed files with 126 additions and 94 deletions

View File

@ -34,6 +34,7 @@ public class NodesHotThreadsRequest extends NodesOperationRequest {
int threads = 3;
String type = "cpu";
TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
int snapshots = 10;
/**
* Get hot threads from nodes based on the nodes ids specified. If none are passed, hot
@ -70,12 +71,22 @@ public class NodesHotThreadsRequest extends NodesOperationRequest {
return this.interval;
}
public int snapshots() {
return this.snapshots;
}
public NodesHotThreadsRequest snapshots(int snapshots) {
this.snapshots = snapshots;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
threads = in.readInt();
type = in.readString();
interval = TimeValue.readTimeValue(in);
snapshots = in.readInt();
}
@Override
@ -84,5 +95,6 @@ public class NodesHotThreadsRequest extends NodesOperationRequest {
out.writeInt(threads);
out.writeString(type);
interval.writeTo(out);
out.writeInt(snapshots);
}
}

View File

@ -95,7 +95,8 @@ public class TransportNodesHotThreadsAction extends TransportNodesOperationActio
HotThreads hotThreads = new HotThreads()
.busiestThreads(request.request.threads)
.type(request.request.type)
.interval(request.request.interval);
.interval(request.request.interval)
.threadElementsSnapshotCount(request.request.snapshots);
try {
return new NodeHotThreads(clusterService.state().nodes().localNode(), hotThreads.detect());
} catch (Exception e) {

View File

@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
*/
public class HotThreads {
private static final Object mutex = new Object();
private int busiestThreads = 3;
private TimeValue interval = new TimeValue(500, TimeUnit.MILLISECONDS);
private TimeValue threadElementsSnapshotDelay = new TimeValue(10);
@ -68,51 +70,62 @@ public class HotThreads {
}
public String detect() throws Exception {
synchronized (mutex) {
return innerDetect();
}
}
private String innerDetect() throws Exception {
StringBuilder sb = new StringBuilder();
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
if (threadBean.isThreadCpuTimeSupported()) {
if (!threadBean.isThreadCpuTimeEnabled()) threadBean.setThreadCpuTimeEnabled(true);
} else {
throw new IllegalStateException("MBean doesn't support thread CPU Time");
}
Map<Long, MyThreadInfo> threadInfos = new HashMap<Long, MyThreadInfo>();
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
threadInfos.put(threadId, new MyThreadInfo(cpu, info));
}
Thread.sleep(interval.millis());
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
MyThreadInfo data = threadInfos.get(threadId);
if (data != null) {
data.setDelta(cpu, info);
}
}
// sort by delta CPU time on thread.
List<MyThreadInfo> hotties = new ArrayList<MyThreadInfo>(threadInfos.values());
// skip that for now
Collections.sort(hotties, new Comparator<MyThreadInfo>() {
public int compare(MyThreadInfo o1, MyThreadInfo o2) {
if ("cpu".equals(type)) {
return (int) (o2.cpuTime - o1.cpuTime);
} else if ("wait".equals(type)) {
return (int) (o2.waitedTime - o1.waitedTime);
} else if ("block".equals(type)) {
return (int) (o2.blockedTime - o1.blockedTime);
boolean enabledCpu = false;
try {
if (threadBean.isThreadCpuTimeSupported()) {
if (!threadBean.isThreadCpuTimeEnabled()) {
enabledCpu = true;
threadBean.setThreadCpuTimeEnabled(true);
}
throw new IllegalArgumentException();
} else {
throw new IllegalStateException("MBean doesn't support thread CPU Time");
}
});
Map<Long, MyThreadInfo> threadInfos = new HashMap<Long, MyThreadInfo>();
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
threadInfos.put(threadId, new MyThreadInfo(cpu, info));
}
Thread.sleep(interval.millis());
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
MyThreadInfo data = threadInfos.get(threadId);
if (data != null) {
data.setDelta(cpu, info);
}
}
// sort by delta CPU time on thread.
List<MyThreadInfo> hotties = new ArrayList<MyThreadInfo>(threadInfos.values());
// skip that for now
Collections.sort(hotties, new Comparator<MyThreadInfo>() {
public int compare(MyThreadInfo o1, MyThreadInfo o2) {
if ("cpu".equals(type)) {
return (int) (o2.cpuTime - o1.cpuTime);
} else if ("wait".equals(type)) {
return (int) (o2.waitedTime - o1.waitedTime);
} else if ("block".equals(type)) {
return (int) (o2.blockedTime - o1.blockedTime);
}
throw new IllegalArgumentException();
}
});
// for(MyThreadInfo inf : hotties) {
// if(inf.deltaDone) {
// System.out.format("%5.2f %d/%d %d/%d %s%n",
@ -125,66 +138,71 @@ public class HotThreads {
// );
// }
// }
// analyse N stack traces for M busiest threads
long[] ids = new long[busiestThreads];
for (int i = 0; i < busiestThreads; i++) {
MyThreadInfo info = hotties.get(i);
ids[i] = info.info.getThreadId();
}
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
for (int j = 0; j < threadElementsSnapshotCount; j++) {
allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
Thread.sleep(threadElementsSnapshotDelay.millis());
}
for (int t = 0; t < busiestThreads; t++) {
double value = -1;
if ("cpu".equals(type)) {
value = hotties.get(t).cpuTime / 1E7;
} else if ("wait".equals(type)) {
value = hotties.get(t).waitedTime / 1E7;
} else if ("block".equals(type)) {
value = hotties.get(t).blockedTime / 1E7;
// analyse N stack traces for M busiest threads
long[] ids = new long[busiestThreads];
for (int i = 0; i < busiestThreads; i++) {
MyThreadInfo info = hotties.get(i);
ids[i] = info.info.getThreadId();
}
sb.append(String.format("%n%4.1f%% %s usage by thread '%s'%n", value, type, allInfos[0][t].getThreadName()));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
for (int i = 0; i < threadElementsSnapshotCount; i++) {
if (done[i]) continue;
int maxSim = 1;
boolean[] similars = new boolean[threadElementsSnapshotCount];
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (done[j]) continue;
int similarity = similarity(allInfos[i][t], allInfos[j][t]);
if (similarity > maxSim) {
maxSim = similarity;
similars = new boolean[threadElementsSnapshotCount];
}
if (similarity == maxSim) similars[j] = true;
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
for (int j = 0; j < threadElementsSnapshotCount; j++) {
allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
Thread.sleep(threadElementsSnapshotDelay.millis());
}
for (int t = 0; t < busiestThreads; t++) {
double value = -1;
if ("cpu".equals(type)) {
value = hotties.get(t).cpuTime / 1E7;
} else if ("wait".equals(type)) {
value = hotties.get(t).waitedTime / 1E7;
} else if ("block".equals(type)) {
value = hotties.get(t).blockedTime / 1E7;
}
// print out trace maxSim levels of i, and mark similar ones as done
int count = 1;
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (similars[j]) {
done[j] = true;
count++;
sb.append(String.format("%n%4.1f%% %s usage by thread '%s'%n", value, type, allInfos[0][t].getThreadName()));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
for (int i = 0; i < threadElementsSnapshotCount; i++) {
if (done[i]) continue;
int maxSim = 1;
boolean[] similars = new boolean[threadElementsSnapshotCount];
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (done[j]) continue;
int similarity = similarity(allInfos[i][t], allInfos[j][t]);
if (similarity > maxSim) {
maxSim = similarity;
similars = new boolean[threadElementsSnapshotCount];
}
if (similarity == maxSim) similars[j] = true;
}
}
StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(" unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
// print out trace maxSim levels of i, and mark similar ones as done
int count = 1;
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (similars[j]) {
done[j] = true;
count++;
}
}
} else {
sb.append(String.format(" %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(" unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
}
} else {
sb.append(String.format(" %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(" %s%n", show[l]));
}
}
}
}
return sb.toString();
} finally {
if (enabledCpu) {
threadBean.setThreadCpuTimeEnabled(false);
}
}
return sb.toString();
}
private int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) {

View File

@ -58,6 +58,7 @@ public class RestNodesHotThreadsAction extends BaseRestHandler {
nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads()));
nodesHotThreadsRequest.type(request.param("type", nodesHotThreadsRequest.type()));
nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval()));
nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots()));
client.admin().cluster().nodesHotThreads(nodesHotThreadsRequest, new ActionListener<NodesHotThreadsResponse>() {
@Override
public void onResponse(NodesHotThreadsResponse response) {