[TEST] Add a more restrictive thread leaks filter

Today all threads are allowed to leak a suite. This is tricky since
it essentially allows resource leaks by default where for instance
test private TransportClients will never get closed and consume
resources influencing other tests. It also hides threads that
are not fully under elasticsearchs control like the Lucene
TimeLimitingCollector thread. This commit restricts the threads
that can leak a suite to the threads spawned from testclusters
and fixes sevearl places that leaked threads.

Closes #7833
This commit is contained in:
Simon Willnauer 2014-09-22 22:15:59 +02:00
parent 046a3a02f5
commit 30acba624d
32 changed files with 167 additions and 43 deletions

View File

@ -111,3 +111,7 @@ com.ning.compress.lzf.LZFOutputStream#<init>(java.io.OutputStream)
com.ning.compress.lzf.LZFOutputStream#<init>(java.io.OutputStream, com.ning.compress.BufferRecycler)
com.ning.compress.lzf.LZFUncompressor#<init>(com.ning.compress.DataHandler)
com.ning.compress.lzf.LZFUncompressor#<init>(com.ning.compress.DataHandler, com.ning.compress.BufferRecycler)
@defaultMessage Spawns a new thread which is solely under lucenes control use ThreadPool#estimatedTimeInMillisCounter instead
org.apache.lucene.search.TimeLimitingCollector#getGlobalTimerThread()
org.apache.lucene.search.TimeLimitingCollector#getGlobalCounter()

View File

@ -178,7 +178,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
new ShardSearchRequest(request).types(request.types()).nowInMillis(request.nowInMillis())
.filteringAliases(request.filteringAliases()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()
);
SearchContext.setCurrent(searchContext);
try {

View File

@ -170,7 +170,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {

View File

@ -107,7 +107,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest.request).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API), indexService, indexShard, scriptService,
pageCacheRecycler, bigArrays));
pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
@ -129,7 +129,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest).types(request.types()).nowInMillis(request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API, IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
pageCacheRecycler, bigArrays));
pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));

View File

@ -171,7 +171,7 @@ public class TransportExistsAction extends TransportBroadcastOperationAction<Exi
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,
scriptService, pageCacheRecycler, bigArrays);
scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {

View File

@ -119,7 +119,7 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
.nowInMillis(request.nowInMillis),
null, result.searcher(), indexService, indexShard,
scriptService, pageCacheRecycler,
bigArrays
bigArrays, threadPool.estimatedTimeInMillisCounter()
);
SearchContext.setCurrent(context);

View File

@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
@ -149,8 +150,8 @@ public class Lucene {
/**
* Wraps <code>delegate</code> with a time limited collector with a timeout of <code>timeoutInMillis</code>
*/
public final static TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, TimeLimitingCollector.getGlobalCounter(), timeoutInMillis);
public final static TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, final Counter counter, long timeoutInMillis) {
return new TimeLimitingCollector(delegate, counter, timeoutInMillis);
}
/**

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
@ -687,4 +688,9 @@ public class PercolateContext extends SearchContext {
public SearchContext useSlowScroll(boolean useSlowScroll) {
throw new UnsupportedOperationException();
}
@Override
public Counter timeEstimateCounter() {
throw new UnsupportedOperationException();
}
}

View File

@ -523,7 +523,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays);
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.util.BigArrays;
@ -609,4 +610,9 @@ public class TopHitsContext extends SearchContext {
public SearchContext useSlowScroll(boolean useSlowScroll) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public Counter timeEstimateCounter() {
throw new UnsupportedOperationException("Not supported");
}
}

View File

@ -132,7 +132,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
if (timeoutSet) {
// TODO: change to use our own counter that uses the scheduler in ThreadPool
// throws TimeLimitingCollector.TimeExceededException when timeout has reached
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeoutInMillis());
collector = Lucene.wrapTimeLimitingCollector(collector, searchContext.timeEstimateCounter(), searchContext.timeoutInMillis());
}
if (terminateAfterSet) {
// throws Lucene.EarlyTerminationException when given count is reached

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
@ -83,6 +84,7 @@ public class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final Counter timeEstimateCounter;
private SearchType searchType;
@ -179,7 +181,7 @@ public class DefaultSearchContext extends SearchContext {
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays) {
BigArrays bigArrays, Counter timeEstimateCounter) {
this.id = id;
this.request = request;
this.searchType = request.searchType();
@ -199,6 +201,7 @@ public class DefaultSearchContext extends SearchContext {
// initialize the filtering alias based on the provided filters
aliasFilter = indexService.aliasesService().aliasFilter(request.filteringAliases());
this.timeEstimateCounter = timeEstimateCounter;
}
@Override
@ -705,4 +708,9 @@ public class DefaultSearchContext extends SearchContext {
this.useSlowScroll = useSlowScroll;
return this;
}
@Override
public Counter timeEstimateCounter() {
return timeEstimateCounter;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Nullable;
@ -354,6 +355,8 @@ public abstract class SearchContext implements Releasable {
public abstract SearchContext useSlowScroll(boolean useSlowScroll);
public abstract Counter timeEstimateCounter();
/**
* The life time of an object that is used during search execution.
*/

View File

@ -23,6 +23,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
@ -150,6 +151,10 @@ public class ThreadPool extends AbstractComponent {
return estimatedTimeThread.estimatedTimeInMillis();
}
public Counter estimatedTimeInMillisCounter() {
return estimatedTimeThread.counter;
}
public ThreadPoolInfo info() {
List<Info> infos = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
@ -264,6 +269,7 @@ public class ThreadPool extends AbstractComponent {
while (!retiredExecutors.isEmpty()) {
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit);
}
estimatedTimeThread.join(unit.toMillis(timeout));
return result;
}
@ -506,15 +512,15 @@ public class ThreadPool extends AbstractComponent {
static class EstimatedTimeThread extends Thread {
final long interval;
final TimeCounter counter;
volatile boolean running = true;
volatile long estimatedTimeInMillis;
EstimatedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.estimatedTimeInMillis = System.currentTimeMillis();
this.counter = new TimeCounter();
setDaemon(true);
}
@ -534,6 +540,19 @@ public class ThreadPool extends AbstractComponent {
}
}
}
private class TimeCounter extends Counter {
@Override
public long addAndGet(long delta) {
throw new UnsupportedOperationException();
}
@Override
public long get() {
return estimatedTimeInMillis;
}
}
}
static class ExecutorHolder {

View File

@ -50,8 +50,8 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
}
@After
public void cleanup() {
threadPool.shutdownNow();
public void cleanup() throws InterruptedException {
terminate(threadPool);
}
@Override

View File

@ -94,8 +94,9 @@ public class TransportClientTests extends ElasticsearchIntegrationTest {
@Test
public void testThatTransportClientSettingCannotBeChanged() {
TransportClient client = new TransportClient(settingsBuilder().put(Client.CLIENT_TYPE_SETTING, "anything"));
Settings settings = client.injector.getInstance(Settings.class);
assertThat(settings.get(Client.CLIENT_TYPE_SETTING), is("transport"));
try (TransportClient client = new TransportClient(settingsBuilder().put(Client.CLIENT_TYPE_SETTING, "anything"))) {
Settings settings = client.injector.getInstance(Settings.class);
assertThat(settings.get(Client.CLIENT_TYPE_SETTING), is("transport"));
}
}
}

View File

@ -60,7 +60,7 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
@Test
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
@ -85,11 +85,12 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}
@Test
public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
@ -114,11 +115,12 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}
@Test
public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
@ -143,11 +145,12 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}
@Test
public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8);
@ -172,12 +175,13 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
terminate(executor);
}
@Test
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName()));
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
@ -233,9 +237,7 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
block.countDown();
Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
timer.shutdownNow();
executor.shutdownNow();
assertTrue(terminate(timer, executor));
}
static class AwaitingJob extends PrioritizedRunnable {

View File

@ -146,10 +146,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.close();
store.close();
if (threadPool != null) {
threadPool.shutdownNow();
}
terminate(threadPool);
}
private Document testDocumentWithTextField() {

View File

@ -46,7 +46,9 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -97,6 +99,12 @@ public class TemplateQueryParserTest extends ElasticsearchTestCase {
context = new QueryParseContext(index, queryParserService);
}
@After
public void tearDown() throws Exception {
super.tearDown();
terminate(injector.getInstance(ThreadPool.class));
}
@Test
public void testParser() throws IOException {
String templateString = "{\"template\": {"

View File

@ -55,7 +55,7 @@ import static org.hamcrest.Matchers.equalTo;
public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase {
@Test
public void testCustomInjection() {
public void testCustomInjection() throws InterruptedException {
Settings settings = ImmutableSettings.builder().put("name", "testCustomInjection").build();
IndexQueryParserModule queryParserModule = new IndexQueryParserModule(settings);
@ -95,6 +95,6 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase {
PluginJsonFilterParser myJsonFilterParser = (PluginJsonFilterParser) indexQueryParserService.filterParser("my");
assertThat(myJsonFilterParser.names()[0], equalTo("my"));
injector.getInstance(ThreadPool.class).shutdownNow();
terminate(injector.getInstance(ThreadPool.class));
}
}

View File

@ -55,7 +55,7 @@ import static org.hamcrest.Matchers.equalTo;
public class IndexQueryParserPluginTests extends ElasticsearchTestCase {
@Test
public void testCustomInjection() {
public void testCustomInjection() throws InterruptedException {
Settings settings = ImmutableSettings.builder().put("name", "testCustomInjection").build();
IndexQueryParserModule queryParserModule = new IndexQueryParserModule(settings);
@ -104,6 +104,6 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase {
PluginJsonFilterParser myJsonFilterParser = (PluginJsonFilterParser) indexQueryParserService.filterParser("my");
assertThat(myJsonFilterParser.names()[0], equalTo("my"));
injector.getInstance(ThreadPool.class).shutdownNow();
terminate(injector.getInstance(ThreadPool.class));
}
}

View File

@ -35,6 +35,7 @@ import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -157,6 +158,7 @@ public class IndexActionTests extends ElasticsearchIntegrationTest {
for (int i=0;i<docCount;i++) {
assertThat(createdCounts.get(i), lessThanOrEqualTo(1));
}
terminate(threadPool);
}
@Test

View File

@ -26,21 +26,26 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
public class NativeScriptTests extends ElasticsearchTestCase {
@Test
public void testNativeScript() {
public void testNativeScript() throws InterruptedException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("script.native.my.type", MyNativeScriptFactory.class.getName())
.put("name", "testNativeScript")
.build();
Injector injector = new ModulesBuilder().add(
new ThreadPoolModule(settings),
new SettingsModule(settings),
new ScriptModule(settings)).createInjector();
@ -48,6 +53,7 @@ public class NativeScriptTests extends ElasticsearchTestCase {
ExecutableScript executable = scriptService.executable("native", "my", ScriptService.ScriptType.INLINE, null);
assertThat(executable.run().toString(), equalTo("test"));
terminate(injector.getInstance(ThreadPool.class));
}
static class MyNativeScriptFactory implements NativeScriptFactory {

View File

@ -47,7 +47,7 @@ public class SearchTimeoutTests extends ElasticsearchIntegrationTest {
SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout("10ms")
.setQuery(filteredQuery(matchAllQuery(), scriptFilter("Thread.sleep(100); return true;")))
.setQuery(filteredQuery(matchAllQuery(), scriptFilter("Thread.sleep(500); return true;")))
.execute().actionGet();
assertThat(searchResponse.isTimedOut(), equalTo(true));
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.*;
import java.io.Closeable;
@ -54,6 +55,7 @@ import java.lang.reflect.Modifier;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -64,7 +66,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllS
* Base testcase for randomized unit testing with Elasticsearch
*/
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(Scope.NONE)
@ThreadLeakScope(Scope.SUITE)
@TimeoutSuite(millis = 20 * TimeUnits.MINUTE) // timeout the suite after 20min and fail the test.
@Listeners(LoggingListener.class)
public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
@ -523,4 +525,25 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
return System.getProperty(TESTS_BACKWARDS_COMPATIBILITY_VERSION);
}
public static boolean terminate(ExecutorService... services) throws InterruptedException {
boolean terminated = true;
for (ExecutorService service : services) {
if (service != null) {
service.shutdown();
service.shutdownNow();
terminated &= service.awaitTermination(10, TimeUnit.SECONDS);
}
}
return terminated;
}
public static boolean terminate(ThreadPool service) throws InterruptedException {
if (service != null) {
service.shutdown();
service.shutdownNow();
return service.awaitTermination(10, TimeUnit.SECONDS);
}
return true;
}
}

View File

@ -20,13 +20,38 @@
package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.ThreadFilter;
import org.elasticsearch.common.network.MulticastChannel;
import org.elasticsearch.test.hamcrest.RegexMatcher;
import org.elasticsearch.tribe.TribeTests;
import java.util.regex.Pattern;
/**
* Simple thread filter for randomized runner
* This filter rejectes all threads that are known to leak across
* tests / suites ie. the global test cluster threads etc.
* It will cause threads leaking from threadpools / executors in unittests
* to fail the test.
*/
public final class ElasticsearchThreadFilter implements ThreadFilter {
private final Pattern nodePrefix = Pattern.compile("\\[(" +
"(" + Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX) + ")?(" +
Pattern.quote(ElasticsearchIntegrationTest.GLOBAL_CLUSTER_NODE_PREFIX) + "|" +
Pattern.quote(ElasticsearchIntegrationTest.SUITE_CLUSTER_NODE_PREFIX) + "|" +
Pattern.quote(ElasticsearchIntegrationTest.TEST_CLUSTER_NODE_PREFIX) + "|" +
Pattern.quote(TribeTests.SECOND_CLUSTER_NODE_PREFIX) + ")"
+ ")\\d+\\]");
@Override
public boolean reject(Thread t) {
return true;
String threadName = t.getName();
if (threadName.contains("[" + MulticastChannel.SHARED_CHANNEL_NAME + "]")
|| threadName.contains("[" + ElasticsearchSingleNodeTest.nodeName() + "]")
|| threadName.contains("Keep-Alive-Timer")) {
return true;
}
return nodePrefix.matcher(t.getName()).find();
}
}

View File

@ -273,7 +273,7 @@ public final class InternalTestCluster extends TestCluster {
builder.put("logger.prefix", System.getProperty("es.logger.level"));
}
defaultSettings = builder.build();
executor = EsExecutors.newCached(1, TimeUnit.MINUTES, EsExecutors.daemonThreadFactory("test_" + clusterName));
executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
this.hasFilterCache = random.nextBoolean();
}

View File

@ -22,6 +22,7 @@ import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
@ -597,4 +598,9 @@ public class TestSearchContext extends SearchContext {
public SearchContext useSlowScroll(boolean useSlowScroll) {
return null;
}
@Override
public Counter timeEstimateCounter() {
throw new UnsupportedOperationException();
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -92,9 +93,10 @@ public class ThreadPoolSerializationTests extends ElasticsearchTestCase {
}
@Test
public void testThatNegativeSettingAllowsToStart() {
public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build();
ThreadPool threadPool = new ThreadPool(settings, null);
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
terminate(threadPool);
}
}

View File

@ -224,7 +224,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
threadPool.shutdownNow();
terminate(threadPool);
latch.await();
}

View File

@ -101,7 +101,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
super.tearDown();
serviceA.close();
serviceB.close();
threadPool.shutdown();
terminate(threadPool);
}
@Test

View File

@ -25,6 +25,8 @@ import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.*;
@ -63,8 +65,10 @@ public class ResourceWatcherServiceTests extends ElasticsearchTestCase {
assertThat(service.highMonitor.interval.millis(), is(timeValueSeconds(10).millis()));
assertThat(service.mediumMonitor.interval.millis(), is(timeValueSeconds(20).millis()));
assertThat(service.lowMonitor.interval.millis(), is(timeValueSeconds(30).millis()));
terminate(threadPool);
}
@Test
public void testHandle() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
@ -104,5 +108,6 @@ public class ResourceWatcherServiceTests extends ElasticsearchTestCase {
assertThat(service.highMonitor.watchers.size(), is(0));
handle.resume();
assertThat(service.highMonitor.watchers.size(), is(1));
terminate(threadPool);
}
}