mirror of https://github.com/apache/lucene.git
LUCENE-2819: prevent 'collateral damage' from threads in tests, warn about thread resource leaks
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1050697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8584e1016f
commit
623fd7bcd7
|
@ -69,13 +69,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
protected IndexWriter writer;
|
||||
protected int mergeThreadCount;
|
||||
|
||||
public ConcurrentMergeScheduler() {
|
||||
if (allInstances != null) {
|
||||
// Only for testing
|
||||
addMyself();
|
||||
}
|
||||
}
|
||||
|
||||
/** Sets the max # simultaneous merge threads that should
|
||||
* be running at once. This must be <= {@link
|
||||
* #setMaxMergeCount}. */
|
||||
|
@ -431,7 +424,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
if (!suppressExceptions) {
|
||||
// suppressExceptions is normally only set during
|
||||
// testing.
|
||||
anyExceptions = true;
|
||||
handleMergeException(exc);
|
||||
}
|
||||
}
|
||||
|
@ -471,48 +463,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
throw new MergePolicy.MergeException(exc, dir);
|
||||
}
|
||||
|
||||
static boolean anyExceptions = false;
|
||||
|
||||
/** Used for testing */
|
||||
public static boolean anyUnhandledExceptions() {
|
||||
if (allInstances == null) {
|
||||
throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
|
||||
}
|
||||
synchronized(allInstances) {
|
||||
final int count = allInstances.size();
|
||||
// Make sure all outstanding threads are done so we see
|
||||
// any exceptions they may produce:
|
||||
for(int i=0;i<count;i++)
|
||||
allInstances.get(i).sync();
|
||||
boolean v = anyExceptions;
|
||||
anyExceptions = false;
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
public static void clearUnhandledExceptions() {
|
||||
synchronized(allInstances) {
|
||||
anyExceptions = false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Used for testing */
|
||||
private void addMyself() {
|
||||
synchronized(allInstances) {
|
||||
final int size = allInstances.size();
|
||||
int upto = 0;
|
||||
for(int i=0;i<size;i++) {
|
||||
final ConcurrentMergeScheduler other = allInstances.get(i);
|
||||
if (!(other.closed && 0 == other.mergeThreadCount()))
|
||||
// Keep this one for now: it still has threads or
|
||||
// may spawn new threads
|
||||
allInstances.set(upto++, other);
|
||||
}
|
||||
allInstances.subList(upto, allInstances.size()).clear();
|
||||
allInstances.add(this);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean suppressExceptions;
|
||||
|
||||
/** Used for testing */
|
||||
|
@ -524,10 +474,4 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
|
|||
void clearSuppressExceptions() {
|
||||
suppressExceptions = false;
|
||||
}
|
||||
|
||||
/** Used for testing */
|
||||
private static List<ConcurrentMergeScheduler> allInstances;
|
||||
public static void setTestMode() {
|
||||
allInstances = new ArrayList<ConcurrentMergeScheduler>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,6 +105,5 @@ public class TestMergeSchedulerExternal extends LuceneTestCase {
|
|||
assertTrue(mergeCalled);
|
||||
assertTrue(excCalled);
|
||||
dir.close();
|
||||
assertTrue(ConcurrentMergeScheduler.anyUnhandledExceptions());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,6 +277,7 @@ public class TestMultiSearcher extends LuceneTestCase
|
|||
indexSearcher2.close();
|
||||
ramDirectory1.close();
|
||||
ramDirectory2.close();
|
||||
searcher.close();
|
||||
}
|
||||
|
||||
/* uncomment this when the highest score is always normalized to 1.0, even when it was < 1.0
|
||||
|
|
|
@ -18,16 +18,34 @@ package org.apache.lucene.search;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util._TestUtil;
|
||||
|
||||
/**
|
||||
* Unit tests for the ParallelMultiSearcher
|
||||
*/
|
||||
public class TestParallelMultiSearcher extends TestMultiSearcher {
|
||||
List<ExecutorService> pools = new ArrayList<ExecutorService>();
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
for (ExecutorService exec : pools)
|
||||
exec.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
pools.clear();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MultiSearcher getMultiSearcherInstance(Searcher[] searchers)
|
||||
throws IOException {
|
||||
return new ParallelMultiSearcher(searchers);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(_TestUtil.nextInt(random, 2, 8));
|
||||
pools.add(exec);
|
||||
return new ParallelMultiSearcher(exec, searchers);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,10 @@ import java.util.BitSet;
|
|||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
|
@ -51,6 +55,7 @@ import org.apache.lucene.store.LockObtainFailedException;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.DocIdBitSet;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util._TestUtil;
|
||||
|
||||
/**
|
||||
* Unit tests for sorting code.
|
||||
|
@ -611,7 +616,8 @@ public class TestSort extends LuceneTestCase implements Serializable {
|
|||
// Don't close the multiSearcher. it would close the full searcher too!
|
||||
|
||||
// Do the same for a ParallelMultiSearcher
|
||||
Searcher parallelSearcher=new ParallelMultiSearcher (full);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(_TestUtil.nextInt(random, 2, 8));
|
||||
Searcher parallelSearcher=new ParallelMultiSearcher (exec, full);
|
||||
|
||||
sort.setSort (new SortField ("int", SortField.INT),
|
||||
new SortField ("string", SortField.STRING),
|
||||
|
@ -622,7 +628,8 @@ public class TestSort extends LuceneTestCase implements Serializable {
|
|||
new SortField ("string", SortField.STRING),
|
||||
new SortField ("float", SortField.FLOAT, true) );
|
||||
assertMatches (parallelSearcher, queryG, sort, "ZYXW");
|
||||
// Don't close the parallelSearcher. it would close the full searcher too!
|
||||
parallelSearcher.close();
|
||||
exec.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// test sorts using a series of fields
|
||||
|
@ -690,8 +697,11 @@ public class TestSort extends LuceneTestCase implements Serializable {
|
|||
|
||||
// test a variety of sorts using a parallel multisearcher
|
||||
public void testParallelMultiSort() throws Exception {
|
||||
Searcher searcher = new ParallelMultiSearcher (searchX, searchY);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(_TestUtil.nextInt(random, 2, 8));
|
||||
Searcher searcher = new ParallelMultiSearcher (exec, searchX, searchY);
|
||||
runMultiSorts(searcher, false);
|
||||
searcher.close();
|
||||
exec.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// test that the relevancy scores are the same even if
|
||||
|
|
|
@ -336,6 +336,7 @@ public abstract class LuceneTestCase extends Assert {
|
|||
|
||||
@AfterClass
|
||||
public static void afterClassLuceneTestCaseJ4() {
|
||||
threadCleanup("test class");
|
||||
String codecDescription;
|
||||
CodecProvider cp = CodecProvider.getDefault();
|
||||
|
||||
|
@ -434,13 +435,13 @@ public abstract class LuceneTestCase extends Assert {
|
|||
savedUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
|
||||
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
testsFailed = true;
|
||||
uncaughtExceptions.add(new UncaughtExceptionEntry(t, e));
|
||||
if (savedUncaughtExceptionHandler != null)
|
||||
savedUncaughtExceptionHandler.uncaughtException(t, e);
|
||||
}
|
||||
});
|
||||
|
||||
ConcurrentMergeScheduler.setTestMode();
|
||||
savedBoolMaxClauseCount = BooleanQuery.getMaxClauseCount();
|
||||
}
|
||||
|
||||
|
@ -468,7 +469,10 @@ public abstract class LuceneTestCase extends Assert {
|
|||
public void tearDown() throws Exception {
|
||||
assertTrue("ensure your setUp() calls super.setUp()!!!", setup);
|
||||
setup = false;
|
||||
Thread.setDefaultUncaughtExceptionHandler(savedUncaughtExceptionHandler);
|
||||
BooleanQuery.setMaxClauseCount(savedBoolMaxClauseCount);
|
||||
if (!getClass().getName().startsWith("org.apache.solr"))
|
||||
threadCleanup("test method: '" + getName() + "'");
|
||||
try {
|
||||
|
||||
if (!uncaughtExceptions.isEmpty()) {
|
||||
|
@ -496,19 +500,61 @@ public abstract class LuceneTestCase extends Assert {
|
|||
// isolated in distinct test methods
|
||||
assertSaneFieldCaches(getTestLabel());
|
||||
|
||||
if (ConcurrentMergeScheduler.anyUnhandledExceptions()) {
|
||||
// Clear the failure so that we don't just keep
|
||||
// failing subsequent test cases
|
||||
ConcurrentMergeScheduler.clearUnhandledExceptions();
|
||||
fail("ConcurrentMergeScheduler hit unhandled exceptions");
|
||||
}
|
||||
} finally {
|
||||
purgeFieldCache(FieldCache.DEFAULT);
|
||||
}
|
||||
|
||||
Thread.setDefaultUncaughtExceptionHandler(savedUncaughtExceptionHandler);
|
||||
}
|
||||
|
||||
private final static int THREAD_STOP_GRACE_MSEC = 1000;
|
||||
// jvm-wide list of 'rogue threads' we found, so they only get reported once.
|
||||
private final static IdentityHashMap<Thread,Boolean> rogueThreads = new IdentityHashMap<Thread,Boolean>();
|
||||
|
||||
private static void threadCleanup(String context) {
|
||||
// we will only actually fail() after all cleanup has happened!
|
||||
boolean shouldFail = false;
|
||||
|
||||
// educated guess
|
||||
Thread[] stillRunning = new Thread[Thread.activeCount()+1];
|
||||
int threadCount = 0;
|
||||
int rogueCount = 0;
|
||||
|
||||
if ((threadCount = Thread.enumerate(stillRunning)) > 1) {
|
||||
while (threadCount == stillRunning.length) {
|
||||
// truncated response
|
||||
stillRunning = new Thread[stillRunning.length*2];
|
||||
threadCount = Thread.enumerate(stillRunning);
|
||||
}
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
Thread t = stillRunning[i];
|
||||
// TODO: turn off our exception handler for these leftover threads... does this work?
|
||||
if (t != Thread.currentThread())
|
||||
t.setUncaughtExceptionHandler(null);
|
||||
if (t.isAlive() &&
|
||||
!rogueThreads.containsKey(t) &&
|
||||
t != Thread.currentThread() &&
|
||||
// TODO: TimeLimitingCollector starts a thread statically.... WTF?!
|
||||
!t.getName().equals("TimeLimitedCollector timer thread")) {
|
||||
System.err.println("WARNING: " + context + " left thread running: " + t);
|
||||
rogueThreads.put(t, true);
|
||||
shouldFail = true;
|
||||
rogueCount++;
|
||||
// try to stop the thread:
|
||||
t.interrupt();
|
||||
try {
|
||||
t.join(THREAD_STOP_GRACE_MSEC);
|
||||
} catch (InterruptedException e) { e.printStackTrace(); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldFail && !testsFailed /* don't be loud if the test failed, maybe it didnt join() etc */) {
|
||||
// TODO: we can't fail until we fix contrib and solr
|
||||
//fail("test '" + getName() + "' left " + rogueCount + " thread(s) running");
|
||||
System.err.println("RESOURCE LEAK: " + context + " left " + rogueCount + " thread(s) running");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that FieldCacheSanityChecker does not detect any
|
||||
* problems with FieldCache.DEFAULT.
|
||||
|
|
Loading…
Reference in New Issue