Shared executor for LuceneTestCase#newSearcher callers (#12588)

Until now, LuceneTestCase#newSearcher randomly associates the returned
IndexSearcher instance with an executor that is ad-hoc created, which
gets shut down when the index reader is closed.

This has made us catch a couple of cases where we were not properly
closing readers in tests. Most recently, we have been seeing test
failures (OOM - unable to create thread) due to too many executor
instances created as part of the same test. This is to be attributed to
creating too many searcher instance, each one getting its separate
executor, which all get shutdown at the end of the entire suite. The
main offender for this is QueryUtils which creates a new searcher for
each leaf reader, and the top-level reader gets closed in the
AfterClass, hence all the executors will stay around for the entire
duration of the test suite that relies on QueryUtils.

This commit eagerly creates an executor in an additional before class
method for LuceneTestCase, and associates that with each searcher that
is supposed to get a non null executor.

Note that the executor is shutdown in the after class to ensure that
no threads leak in tests.

This has the additional advantage that it removes the need to close the
executor as part of an index reader close listener, which also requires
the reader to have an associated reader cache helper.
This commit is contained in:
Luca Cavanna 2023-09-25 17:00:59 +02:00 committed by GitHub
parent ce464c7d6d
commit f559cac755
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 20 deletions

View File

@ -92,6 +92,7 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -1875,6 +1876,32 @@ public abstract class LuceneTestCase extends Assert {
System.clearProperty(ConcurrentMergeScheduler.DEFAULT_CPU_CORE_COUNT_PROPERTY); System.clearProperty(ConcurrentMergeScheduler.DEFAULT_CPU_CORE_COUNT_PROPERTY);
} }
private static ExecutorService executor;
@BeforeClass
public static void setUpExecutorService() {
int threads = TestUtil.nextInt(random(), 1, 2);
executor =
new ThreadPoolExecutor(
threads,
threads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("LuceneTestCase"));
// uncomment to intensify LUCENE-3840
// executor.prestartAllCoreThreads();
if (VERBOSE) {
System.out.println("NOTE: Created shared ExecutorService with " + threads + " threads");
}
}
@AfterClass
public static void shutdownExecutorService() {
TestUtil.shutdownExecutorService(executor);
executor = null;
}
/** Create a new searcher over the reader. This searcher might randomly use threads. */ /** Create a new searcher over the reader. This searcher might randomly use threads. */
public static IndexSearcher newSearcher(IndexReader r) { public static IndexSearcher newSearcher(IndexReader r) {
return newSearcher(r, true); return newSearcher(r, true);
@ -1938,30 +1965,14 @@ public abstract class LuceneTestCase extends Assert {
ret.setSimilarity(classEnvRule.similarity); ret.setSimilarity(classEnvRule.similarity);
return ret; return ret;
} else { } else {
int threads = 0; final ExecutorService ex;
final ThreadPoolExecutor ex; if (random.nextBoolean()) {
if (r.getReaderCacheHelper() == null || random.nextBoolean()) {
ex = null; ex = null;
} else { } else {
threads = TestUtil.nextInt(random, 1, 8); ex = executor;
ex =
new ThreadPoolExecutor(
threads,
threads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("LuceneTestCase"));
// uncomment to intensify LUCENE-3840
// ex.prestartAllCoreThreads();
}
if (ex != null) {
if (VERBOSE) { if (VERBOSE) {
System.out.println( System.out.println("NOTE: newSearcher using shared ExecutorService");
"NOTE: newSearcher using ExecutorService with " + threads + " threads");
} }
r.getReaderCacheHelper()
.addClosedListener(cacheKey -> TestUtil.shutdownExecutorService(ex));
} }
IndexSearcher ret; IndexSearcher ret;
int maxDocPerSlice = random.nextBoolean() ? 1 : 1 + random.nextInt(1000); int maxDocPerSlice = random.nextBoolean() ? 1 : 1 + random.nextInt(1000);