diff --git a/src/main/java/org/elasticsearch/common/network/MulticastChannel.java b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java index a4c4179f3c0..0a501b1086d 100644 --- a/src/main/java/org/elasticsearch/common/network/MulticastChannel.java +++ b/src/main/java/org/elasticsearch/common/network/MulticastChannel.java @@ -152,6 +152,7 @@ public abstract class MulticastChannel implements Closeable { protected abstract void close(Listener listener); + public static final String SHARED_CHANNEL_NAME = "#shared#"; /** * A shared channel that keeps a static map of Config -> Shared channels, and closes shared * channel once their reference count has reached 0. It also handles de-registering relevant @@ -172,7 +173,7 @@ public abstract class MulticastChannel implements Closeable { } else { MultiListener multiListener = new MultiListener(); multiListener.add(listener); - shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config)); + shared = new Shared(multiListener, new Plain(multiListener, SHARED_CHANNEL_NAME, config)); sharedChannels.put(config, shared); } return new Delegate(listener, shared); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 21ab6d111e6..5a99423f3fd 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -719,6 +719,7 @@ public final class InternalTestCluster extends TestCluster { } } + public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; static class TransportClientFactory extends ClientFactory { private boolean sniff; @@ -733,7 +734,7 @@ public final class InternalTestCluster extends TestCluster { public Client client(Node node, String clusterName, Random random) { TransportAddress addr = ((InternalNode) node).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder().put("client.transport.nodes_sampler_interval", "1s") - .put("name", "transport_client_" + node.settings().get("name")) + .put("name", TRANSPORT_CLIENT_PREFIX + node.settings().get("name")) .put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) .put(ClusterName.SETTING, clusterName).put("client.transport.sniff", sniff).build()); client.addTransportAddress(addr); diff --git a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java index f32f1209b2c..a2d67b2b31a 100644 --- a/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java +++ b/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java @@ -19,8 +19,13 @@ package org.elasticsearch.threadpool; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.network.MulticastChannel; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; @@ -28,22 +33,35 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.TestCluster; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.hamcrest.Matchers; import org.junit.Test; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; import static org.hamcrest.Matchers.*; /** */ -@ClusterScope(scope= Scope.TEST, numDataNodes =2) +@ClusterScope(scope= Scope.TEST, numDataNodes = 0, numClientNodes = 0) public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest { @Override @@ -51,8 +69,54 @@ public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest { return ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").put(super.nodeSettings(nodeOrdinal)).build(); } + @Test + public void verifyThreadNames() throws Exception { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + Set preNodeStartThreadNames = Sets.newHashSet(); + for (long l : threadBean.getAllThreadIds()) { + preNodeStartThreadNames.add(threadBean.getThreadInfo(l).getThreadName()); + } + logger.info("pre node threads are {}", preNodeStartThreadNames); + String node = internalCluster().startNode(); + logger.info("do some indexing, flushing, optimize, and searches"); + int numDocs = randomIntBetween(2, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; ++i) { + builders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder() + .startObject() + .field("str_value", "s" + i) + .field("str_values", new String[]{"s" + (i * 2), "s" + (i * 2 + 1)}) + .field("l_value", i) + .field("l_values", new int[] {i * 2, i * 2 + 1}) + .field("d_value", i) + .field("d_values", new double[]{i * 2, i * 2 + 1}) + .endObject()); + } + indexRandom(true, builders); + int numSearches = randomIntBetween(2, 100); + for (int i = 0; i < numSearches; i++) { + assertAllSuccessful(client().prepareSearch("idx").setQuery(QueryBuilders.termQuery("str_value", "s" + i)).get()); + assertAllSuccessful(client().prepareSearch("idx").setQuery(QueryBuilders.termQuery("l_value", i)).get()); + } + Set threadNames = Sets.newHashSet(); + for (long l : threadBean.getAllThreadIds()) { + threadNames.add(threadBean.getThreadInfo(l).getThreadName()); + } + logger.info("post node threads are {}", threadNames); + threadNames.removeAll(preNodeStartThreadNames); + logger.info("post node *new* threads are {}", threadNames); + for (String threadName : threadNames) { + // ignore some shared threads we know that are created within the same VM, like the shared discovery one + if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]")) { + continue; + } + assertThat(threadName, anyOf(containsString("[" + node + "]"), containsString("[" + InternalTestCluster.TRANSPORT_CLIENT_PREFIX + node + "]"))); + } + } + @Test(timeout = 20000) public void testUpdatingThreadPoolSettings() throws Exception { + internalCluster().startNodesAsync(2).get(); ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); // Check that settings are changed assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L));