[TEST] verify all threads created by node and client have the node name

closes #6516
This commit is contained in:
Shay Banon 2014-06-16 16:40:20 +02:00
parent 612f4618e7
commit 0427e49b5d
3 changed files with 69 additions and 3 deletions

View File

@ -152,6 +152,7 @@ public abstract class MulticastChannel implements Closeable {
protected abstract void close(Listener listener); protected abstract void close(Listener listener);
public static final String SHARED_CHANNEL_NAME = "#shared#";
/** /**
* A shared channel that keeps a static map of Config -> Shared channels, and closes shared * A shared channel that keeps a static map of Config -> Shared channels, and closes shared
* channel once their reference count has reached 0. It also handles de-registering relevant * channel once their reference count has reached 0. It also handles de-registering relevant
@ -172,7 +173,7 @@ public abstract class MulticastChannel implements Closeable {
} else { } else {
MultiListener multiListener = new MultiListener(); MultiListener multiListener = new MultiListener();
multiListener.add(listener); multiListener.add(listener);
shared = new Shared(multiListener, new Plain(multiListener, "#shared#", config)); shared = new Shared(multiListener, new Plain(multiListener, SHARED_CHANNEL_NAME, config));
sharedChannels.put(config, shared); sharedChannels.put(config, shared);
} }
return new Delegate(listener, shared); return new Delegate(listener, shared);

View File

@ -719,6 +719,7 @@ public final class InternalTestCluster extends TestCluster {
} }
} }
public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_";
static class TransportClientFactory extends ClientFactory { static class TransportClientFactory extends ClientFactory {
private boolean sniff; private boolean sniff;
@ -733,7 +734,7 @@ public final class InternalTestCluster extends TestCluster {
public Client client(Node node, String clusterName, Random random) { public Client client(Node node, String clusterName, Random random) {
TransportAddress addr = ((InternalNode) node).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportAddress addr = ((InternalNode) node).injector().getInstance(TransportService.class).boundAddress().publishAddress();
TransportClient client = new TransportClient(settingsBuilder().put("client.transport.nodes_sampler_interval", "1s") TransportClient client = new TransportClient(settingsBuilder().put("client.transport.nodes_sampler_interval", "1s")
.put("name", "transport_client_" + node.settings().get("name")) .put("name", TRANSPORT_CLIENT_PREFIX + node.settings().get("name"))
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) .put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.put(ClusterName.SETTING, clusterName).put("client.transport.sniff", sniff).build()); .put(ClusterName.SETTING, clusterName).put("client.transport.sniff", sniff).build());
client.addTransportAddress(addr); client.addTransportAddress(addr);

View File

@ -19,8 +19,13 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.network.MulticastChannel;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -28,22 +33,35 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.threadpool.ThreadPool.Names;
import org.hamcrest.Matchers;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
/** /**
*/ */
@ClusterScope(scope= Scope.TEST, numDataNodes =2) @ClusterScope(scope= Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest { public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest {
@Override @Override
@ -51,8 +69,54 @@ public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest {
return ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").put(super.nodeSettings(nodeOrdinal)).build(); return ImmutableSettings.settingsBuilder().put("threadpool.search.type", "cached").put(super.nodeSettings(nodeOrdinal)).build();
} }
@Test
public void verifyThreadNames() throws Exception {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
Set<String> preNodeStartThreadNames = Sets.newHashSet();
for (long l : threadBean.getAllThreadIds()) {
preNodeStartThreadNames.add(threadBean.getThreadInfo(l).getThreadName());
}
logger.info("pre node threads are {}", preNodeStartThreadNames);
String node = internalCluster().startNode();
logger.info("do some indexing, flushing, optimize, and searches");
int numDocs = randomIntBetween(2, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; ++i) {
builders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder()
.startObject()
.field("str_value", "s" + i)
.field("str_values", new String[]{"s" + (i * 2), "s" + (i * 2 + 1)})
.field("l_value", i)
.field("l_values", new int[] {i * 2, i * 2 + 1})
.field("d_value", i)
.field("d_values", new double[]{i * 2, i * 2 + 1})
.endObject());
}
indexRandom(true, builders);
int numSearches = randomIntBetween(2, 100);
for (int i = 0; i < numSearches; i++) {
assertAllSuccessful(client().prepareSearch("idx").setQuery(QueryBuilders.termQuery("str_value", "s" + i)).get());
assertAllSuccessful(client().prepareSearch("idx").setQuery(QueryBuilders.termQuery("l_value", i)).get());
}
Set<String> threadNames = Sets.newHashSet();
for (long l : threadBean.getAllThreadIds()) {
threadNames.add(threadBean.getThreadInfo(l).getThreadName());
}
logger.info("post node threads are {}", threadNames);
threadNames.removeAll(preNodeStartThreadNames);
logger.info("post node *new* threads are {}", threadNames);
for (String threadName : threadNames) {
// ignore some shared threads we know that are created within the same VM, like the shared discovery one
if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]")) {
continue;
}
assertThat(threadName, anyOf(containsString("[" + node + "]"), containsString("[" + InternalTestCluster.TRANSPORT_CLIENT_PREFIX + node + "]")));
}
}
@Test(timeout = 20000) @Test(timeout = 20000)
public void testUpdatingThreadPoolSettings() throws Exception { public void testUpdatingThreadPoolSettings() throws Exception {
internalCluster().startNodesAsync(2).get();
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
// Check that settings are changed // Check that settings are changed
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L));