From 42377db084f96e75fcbc28b76c45d96315ac2959 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 17 Jan 2014 10:28:36 +0100 Subject: [PATCH] Check ThreadInfo[] for null element if thread are not alive. If a thread is not alive getting ThreadMXBean#getThreadInfo(long[], int) places null elemnents in the returned array which are not repected in the HotTheards API. Closes #4775 --- .../elasticsearch/monitor/jvm/HotThreads.java | 47 +++++-- .../action/admin/HotThreadsTest.java | 128 ++++++++++++++++++ 2 files changed, 162 insertions(+), 13 deletions(-) create mode 100644 src/test/java/org/elasticsearch/action/admin/HotThreadsTest.java diff --git a/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java b/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java index 03d9991adbc..1cbe26ef811 100644 --- a/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java +++ b/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java @@ -151,6 +151,9 @@ public class HotThreads { } ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][]; for (int j = 0; j < threadElementsSnapshotCount; j++) { + // NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist, + // null will be set in the corresponding element in the returned array. A thread is alive if it has + // been started and has not yet died. allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE); Thread.sleep(threadElementsSnapshotDelay.millis()); } @@ -163,8 +166,22 @@ public class HotThreads { } else if ("block".equals(type)) { time = hotties.get(t).blockedTime; } + String threadName = null; + if (allInfos[0][t] == null) { + for (ThreadInfo[] info : allInfos) { + if (info != null && info[t] != null) { + threadName = info[t].getThreadName(); + break; + } + } + if (threadName == null) { + continue; // thread is not alive yet or died before the first snapshot - ignore it! + } + } else { + threadName = allInfos[0][t].getThreadName(); + } double percent = (((double) time) / interval.nanos()) * 100; - sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, allInfos[0][t].getThreadName())); + sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, threadName)); // for each snapshot (2nd array index) find later snapshot for same thread with max number of // identical StackTraceElements (starting from end of each) boolean[] done = new boolean[threadElementsSnapshotCount]; @@ -189,16 +206,18 @@ public class HotThreads { count++; } } - StackTraceElement[] show = allInfos[i][t].getStackTrace(); - if (count == 1) { - sb.append(String.format(Locale.ROOT, " unique snapshot%n")); - for (int l = 0; l < show.length; l++) { - sb.append(String.format(Locale.ROOT, " %s%n", show[l])); - } - } else { - sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim)); - for (int l = show.length - maxSim; l < show.length; l++) { - sb.append(String.format(Locale.ROOT, " %s%n", show[l])); + if (allInfos[i][t] != null) { + final StackTraceElement[] show = allInfos[i][t].getStackTrace(); + if (count == 1) { + sb.append(String.format(Locale.ROOT, " unique snapshot%n")); + for (int l = 0; l < show.length; l++) { + sb.append(String.format(Locale.ROOT, " %s%n", show[l])); + } + } else { + sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim)); + for (int l = show.length - maxSim; l < show.length; l++) { + sb.append(String.format(Locale.ROOT, " %s%n", show[l])); + } } } } @@ -211,9 +230,11 @@ public class HotThreads { } } + private static final StackTraceElement[] EMPTY = new StackTraceElement[0]; + private int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) { - StackTraceElement[] s1 = threadInfo.getStackTrace(); - StackTraceElement[] s2 = threadInfo0.getStackTrace(); + StackTraceElement[] s1 = threadInfo == null ? EMPTY : threadInfo.getStackTrace(); + StackTraceElement[] s2 = threadInfo0 == null ? EMPTY : threadInfo0.getStackTrace(); int i = s1.length - 1; int j = s2.length - 1; int rslt = 0; diff --git a/src/test/java/org/elasticsearch/action/admin/HotThreadsTest.java b/src/test/java/org/elasticsearch/action/admin/HotThreadsTest.java new file mode 100644 index 00000000000..32532416d5c --- /dev/null +++ b/src/test/java/org/elasticsearch/action/admin/HotThreadsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.index.query.FilterBuilders.andFilter; +import static org.elasticsearch.index.query.FilterBuilders.notFilter; +import static org.elasticsearch.index.query.FilterBuilders.queryFilter; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; + +/** + */ +public class HotThreadsTest extends ElasticsearchIntegrationTest { + + @Test + public void testHotThreadsDontFail() throws ExecutionException, InterruptedException { + /** + * This test just checks if nothing crashes or gets stuck etc. + */ + createIndex("test"); + final int iters = atLeast(2); + + for (int i = 0; i < iters; i++) { + final String type; + NodesHotThreadsRequestBuilder nodesHotThreadsRequestBuilder = client().admin().cluster().prepareNodesHotThreads(); + if (randomBoolean()) { + TimeValue timeValue = new TimeValue(rarely() ? randomIntBetween(500, 5000) : randomIntBetween(20, 500)); + nodesHotThreadsRequestBuilder.setInterval(timeValue); + } + if (randomBoolean()) { + nodesHotThreadsRequestBuilder.setThreads(randomIntBetween(1, 100)); + } + if (randomBoolean()) { + switch (randomIntBetween(0, 2)) { + case 2: + type = "cpu"; + break; + case 1: + type = "wait"; + break; + default: + type = "block"; + break; + } + assertThat(type, notNullValue()); + nodesHotThreadsRequestBuilder.setType(type); + } else { + type = null; + } + final CountDownLatch latch = new CountDownLatch(1); + nodesHotThreadsRequestBuilder.execute(new ActionListener() { + @Override + public void onResponse(NodesHotThreadsResponse nodeHotThreads) { + try { + assertThat(nodeHotThreads, notNullValue()); + Map nodesMap = nodeHotThreads.getNodesMap(); + assertThat(nodesMap.size(), equalTo(cluster().size())); + for (NodeHotThreads ht : nodeHotThreads) { + assertNotNull(ht.getHotThreads()); + //logger.info(ht.getHotThreads()); + } + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Throwable e) { + logger.error("FAILED", e); + latch.countDown(); + fail(); + } + }); + + indexRandom(true, + client().prepareIndex("test", "type1", "1").setSource("field1", "value1"), + client().prepareIndex("test", "type1", "2").setSource("field1", "value2"), + client().prepareIndex("test", "type1", "3").setSource("field1", "value3")); + ensureSearchable(); + if (randomBoolean()) { + optimize(); + } + while(latch.getCount() > 0) { + assertHitCount( + client().prepareSearch() + .setQuery(matchAllQuery()) + .setPostFilter( + andFilter( + queryFilter(matchAllQuery()), + notFilter(andFilter(queryFilter(termQuery("field1", "value1")), + queryFilter(termQuery("field1", "value2")))))).get(), + 3l); + } + latch.await(); + } + } +}