TaskExecutor should not fork unnecessarily (#13472)

When an executor is provided to the IndexSearcher constructor, the searcher now executes tasks on the thread that invoked a search as well as its configured executor. Users should reduce the executor's thread-count by
1 to retain the previous level of parallelism. Moreover, it is now possible to start searches from the same executor that is configured in the IndexSearcher without risk of deadlocking. A separate executor for starting searches is no longer required.

Previously, a separate executor was required to prevent deadlock, and all the tasks were offloaded to it unconditionally, wasting resources in some scenarios due to unnecessary forking, and the caller thread having to wait for all tasks to be completed anyways. it can now actively contribute to the execution as well.
This commit is contained in:
Armin Braun 2024-07-04 11:15:26 +02:00 committed by GitHub
parent f4cd4b46fc
commit 62e08f5f4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 90 additions and 64 deletions

View File

@ -277,6 +277,15 @@ Optimizations
* GITHUB#12941: Don't preserve auxiliary buffer contents in LSBRadixSorter if it grows. (Stefan Vodita)
Changes in runtime behavior
---------------------
* GITHUB#13472: When an executor is provided to the IndexSearcher constructor, the searcher now executes tasks on the
thread that invoked a search as well as its configured executor. Users should reduce the executor's thread-count by 1
to retain the previous level of parallelism. Moreover, it is now possible to start searches from the same executor
that is configured in the IndexSearcher without risk of deadlocking. A separate executor for starting searches is no
longer required. (Armin Braun)
Bug Fixes
---------------------

View File

@ -30,6 +30,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@ -37,20 +38,13 @@ import org.apache.lucene.util.ThreadInterruptedException;
* Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search
* across segments as well as query rewrite in some cases. Exposes a single {@link
* #invokeAll(Collection)} method that takes a collection of {@link Callable}s and executes them
* concurrently/ Once all tasks are submitted to the executor, it blocks and wait for all tasks to
* be completed, and then returns a list with the obtained results. Ensures that the underlying
* executor is only used for top-level {@link #invokeAll(Collection)} calls, and not for potential
* {@link #invokeAll(Collection)} calls made from one of the tasks. This is to prevent deadlock with
* certain types of pool based executors (e.g. {@link java.util.concurrent.ThreadPoolExecutor}).
* concurrently. Once all but one task have been submitted to the executor, it tries to run as many
* tasks as possible on the calling thread, then waits for all tasks that have been executed in
* parallel on the executor to be completed and then returns a list with the obtained results.
*
* @lucene.experimental
*/
public final class TaskExecutor {
// a static thread local is ok as long as we use a counter, which accounts for multiple
// searchers holding a different TaskExecutor all backed by the same executor
private static final ThreadLocal<Integer> numberOfRunningTasksInCurrentThread =
ThreadLocal.withInitial(() -> 0);
private final Executor executor;
/**
@ -84,26 +78,21 @@ public final class TaskExecutor {
/**
* Holds all the sub-tasks that a certain operation gets split into as it gets parallelized and
* exposes the ability to invoke such tasks and wait for them all to complete their execution and
* provide their results. Ensures that each task does not get parallelized further: this is
* important to avoid a deadlock in situations where one executor thread waits on other executor
* threads to complete before it can progress. This happens in situations where for instance
* {@link Query#createWeight(IndexSearcher, ScoreMode, float)} is called as part of searching each
* slice, like {@link TopFieldCollector#populateScores(ScoreDoc[], IndexSearcher, Query)} does.
* Additionally, if one task throws an exception, all other tasks from the same group are
* cancelled, to avoid needless computation as their results would not be exposed anyways. Creates
* one {@link FutureTask} for each {@link Callable} provided
* provide their results. Additionally, if one task throws an exception, all other tasks from the
* same group are cancelled, to avoid needless computation as their results would not be exposed
* anyways. Creates one {@link FutureTask} for each {@link Callable} provided
*
* @param <T> the return type of all the callables
*/
private static final class TaskGroup<T> {
private final Collection<RunnableFuture<T>> futures;
private final List<RunnableFuture<T>> futures;
TaskGroup(Collection<Callable<T>> callables) {
List<RunnableFuture<T>> tasks = new ArrayList<>(callables.size());
for (Callable<T> callable : callables) {
tasks.add(createTask(callable));
}
this.futures = Collections.unmodifiableCollection(tasks);
this.futures = Collections.unmodifiableList(tasks);
}
RunnableFuture<T> createTask(Callable<T> callable) {
@ -112,15 +101,10 @@ public final class TaskExecutor {
() -> {
if (startedOrCancelled.compareAndSet(false, true)) {
try {
Integer counter = numberOfRunningTasksInCurrentThread.get();
numberOfRunningTasksInCurrentThread.set(counter + 1);
return callable.call();
} catch (Throwable t) {
cancelAll();
throw t;
} finally {
Integer counter = numberOfRunningTasksInCurrentThread.get();
numberOfRunningTasksInCurrentThread.set(counter - 1);
}
}
// task is cancelled hence it has no results to return. That's fine: they would be
@ -144,32 +128,45 @@ public final class TaskExecutor {
}
List<T> invokeAll(Executor executor) throws IOException {
boolean runOnCallerThread = numberOfRunningTasksInCurrentThread.get() > 0;
for (Runnable runnable : futures) {
if (runOnCallerThread) {
runnable.run();
} else {
executor.execute(runnable);
final int count = futures.size();
// taskId provides the first index of an un-executed task in #futures
final AtomicInteger taskId = new AtomicInteger(0);
// we fork execution count - 1 tasks to execute at least one task on the current thread to
// minimize needless forking and blocking of the current thread
if (count > 1) {
final Runnable work =
() -> {
int id = taskId.getAndIncrement();
if (id < count) {
futures.get(id).run();
}
};
for (int j = 0; j < count - 1; j++) {
executor.execute(work);
}
}
// try to execute as many tasks as possible on the current thread to minimize context
// switching in case of long running concurrent
// tasks as well as dead-locking if the current thread is part of #executor for executors that
// have limited or no parallelism
int id;
while ((id = taskId.getAndIncrement()) < count) {
futures.get(id).run();
if (id >= count - 1) {
// save redundant CAS in case this was the last task
break;
}
}
Throwable exc = null;
List<T> results = new ArrayList<>(futures.size());
for (Future<T> future : futures) {
List<T> results = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Future<T> future = futures.get(i);
try {
results.add(future.get());
} catch (InterruptedException e) {
var newException = new ThreadInterruptedException(e);
if (exc == null) {
exc = newException;
} else {
exc.addSuppressed(newException);
}
exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e));
} catch (ExecutionException e) {
if (exc == null) {
exc = e.getCause();
} else {
exc.addSuppressed(e.getCause());
}
exc = IOUtils.useOrSuppress(exc, e.getCause());
}
}
assert assertAllFuturesCompleted() : "Some tasks are still running?";

View File

@ -266,7 +266,7 @@ public class TestIndexSearcher extends LuceneTestCase {
IOUtils.close(r, dir);
}
public void testSlicesAllOffloadedToTheExecutor() throws IOException {
public void testSlicesOffloadedToTheExecutor() throws IOException {
List<LeafReaderContext> leaves = reader.leaves();
AtomicInteger numExecutions = new AtomicInteger(0);
IndexSearcher searcher =
@ -286,7 +286,7 @@ public class TestIndexSearcher extends LuceneTestCase {
}
};
searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(leaves.size(), numExecutions.get());
assertEquals(leaves.size() - 1, numExecutions.get());
}
public void testNullExecutorNonNullTaskExecutor() {

View File

@ -21,10 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@ -111,7 +108,20 @@ public class TestTaskExecutor extends LuceneTestCase {
assertEquals("exc", runtimeException.getCause().getMessage());
}
public void testInvokeAllFromTaskDoesNotDeadlockSameSearcher() throws IOException {
public void testInvokeAllFromTaskDoesNotDeadlockSameSearcher() throws Exception {
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(executorService);
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(Runnable::run);
executorService
.submit(
() -> {
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(executorService);
return null;
})
.get();
}
private static void doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(Executor executor)
throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 500; i++) {
@ -119,7 +129,7 @@ public class TestTaskExecutor extends LuceneTestCase {
}
try (DirectoryReader reader = iw.getReader()) {
IndexSearcher searcher =
new IndexSearcher(reader, executorService) {
new IndexSearcher(reader, executor) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slices(leaves, 1, 1);
@ -172,7 +182,20 @@ public class TestTaskExecutor extends LuceneTestCase {
}
}
public void testInvokeAllFromTaskDoesNotDeadlockMultipleSearchers() throws IOException {
public void testInvokeAllFromTaskDoesNotDeadlockMultipleSearchers() throws Exception {
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(executorService);
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(Runnable::run);
executorService
.submit(
() -> {
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(executorService);
return null;
})
.get();
}
private static void doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(Executor executor)
throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 500; i++) {
@ -180,7 +203,7 @@ public class TestTaskExecutor extends LuceneTestCase {
}
try (DirectoryReader reader = iw.getReader()) {
IndexSearcher searcher =
new IndexSearcher(reader, executorService) {
new IndexSearcher(reader, executor) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slices(leaves, 1, 1);
@ -202,7 +225,7 @@ public class TestTaskExecutor extends LuceneTestCase {
// searcher has its own
// TaskExecutor, the safeguard is shared among all the searchers that get
// the same executor
IndexSearcher indexSearcher = new IndexSearcher(reader, executorService);
IndexSearcher indexSearcher = new IndexSearcher(reader, executor);
indexSearcher
.getTaskExecutor()
.invokeAll(Collections.singletonList(() -> null));
@ -234,11 +257,8 @@ public class TestTaskExecutor extends LuceneTestCase {
TaskExecutor taskExecutor =
new TaskExecutor(
command -> {
executorService.execute(
() -> {
tasksStarted.incrementAndGet();
command.run();
});
tasksStarted.incrementAndGet();
command.run();
});
AtomicInteger tasksExecuted = new AtomicInteger(0);
List<Callable<Void>> callables = new ArrayList<>();
@ -251,14 +271,14 @@ public class TestTaskExecutor extends LuceneTestCase {
for (int i = 0; i < tasksWithNormalExit; i++) {
callables.add(
() -> {
tasksExecuted.incrementAndGet();
return null;
throw new AssertionError(
"must not be called since the first task failing cancels all subsequent tasks");
});
}
expectThrows(RuntimeException.class, () -> taskExecutor.invokeAll(callables));
assertEquals(1, tasksExecuted.get());
// the callables are technically all run, but the cancelled ones will be no-op
assertEquals(100, tasksStarted.get());
assertEquals(tasksWithNormalExit, tasksStarted.get());
}
/**
@ -308,7 +328,7 @@ public class TestTaskExecutor extends LuceneTestCase {
}
public void testCancelTasksOnException() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
TaskExecutor taskExecutor = new TaskExecutor(Runnable::run);
final int numTasks = random().nextInt(10, 50);
final int throwingTask = random().nextInt(numTasks);
boolean error = random().nextBoolean();