From 937ebd4296d9ee638f29751fc178394e1c9a9f1a Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 20 Sep 2023 12:00:13 +0200 Subject: [PATCH] Prevent concurrent tasks from parallelizing further (#12569) Concurrent search is currently applied once per search call, either when search is called, or when concurrent query rewrite happens. They generally don't happen within one another. There are situations in which we are going to introduce parallelism in places where there could be multiple inner levels of parallelism requested as each task could try to parallelize further. In these cases, with certain executor implementations, like ThreadPoolExecutor, we may deadlock as we are waiting for all tasks to complete but they are waiting for threads to free up to complete their execution. This commit introduces a simple safeguard that makes sure that we only parallelize via the executor at the top-level invokeAll call. When each task tries to parallelize further, we just execute them directly instead of submitting them to the executor. Co-authored-by: Adrien Grand --- lucene/CHANGES.txt | 3 + .../lucene/search/AbstractKnnVectorQuery.java | 6 +- .../apache/lucene/search/IndexSearcher.java | 9 +- .../apache/lucene/search/TaskExecutor.java | 52 ++++++- .../lucene/search/TestTaskExecutor.java | 145 ++++++++++++++++-- 5 files changed, 191 insertions(+), 24 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 123f4e41a89..0d7f323d9fa 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -239,6 +239,9 @@ Changes in runtime behavior * GITHUB#12515: Offload sequential search execution to the executor that's optionally provided to the IndexSearcher (Luca Cavanna) +* GITHUB#12569: Prevent concurrent tasks from parallelizing execution further which could cause deadlock + (Luca Cavanna) + Bug Fixes --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index 25b92c26a9b..e28536ba04c 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -24,8 +24,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexReader; @@ -106,9 +104,9 @@ abstract class AbstractKnnVectorQuery extends Query { private TopDocs[] parallelSearch( List leafReaderContexts, Weight filterWeight, TaskExecutor taskExecutor) throws IOException { - List> tasks = new ArrayList<>(); + List> tasks = new ArrayList<>(); for (LeafReaderContext context : leafReaderContexts) { - tasks.add(new FutureTask<>(() -> searchLeaf(context, filterWeight))); + tasks.add(taskExecutor.createTask(() -> searchLeaf(context, filterWeight))); } return taskExecutor.invokeAll(tasks).toArray(TopDocs[]::new); } diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index a52ad604a8d..9aa865f351a 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -25,8 +25,6 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; import java.util.function.Function; import java.util.function.Supplier; import org.apache.lucene.index.DirectoryReader; @@ -669,17 +667,16 @@ public class IndexSearcher { "CollectorManager does not always produce collectors with the same score mode"); } } - final List> listTasks = new ArrayList<>(); + final List> listTasks = new ArrayList<>(); for (int i = 0; i < leafSlices.length; ++i) { final LeafReaderContext[] leaves = leafSlices[i].leaves; final C collector = collectors.get(i); - FutureTask task = - new FutureTask<>( + TaskExecutor.Task task = + taskExecutor.createTask( () -> { search(Arrays.asList(leaves), weight, collector); return collector; }); - listTasks.add(task); } List results = taskExecutor.invokeAll(listTasks); diff --git a/lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java b/lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java index 46c6f7803b6..d416d70c2f5 100644 --- a/lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java +++ b/lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java @@ -22,18 +22,31 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; +import java.util.concurrent.FutureTask; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search - * across segments as well as query rewrite in some cases. + * across segments as well as query rewrite in some cases. Exposes a {@link #createTask(Callable)} + * method to create tasks given a {@link Callable}, as well as the {@link #invokeAll(Collection)} + * method to execute a set of tasks concurrently. Once all tasks are submitted to the executor, it + * blocks and wait for all tasks to be completed, and then returns a list with the obtained results. + * Ensures that the underlying executor is only used for top-level {@link #invokeAll(Collection)} + * calls, and not for potential {@link #invokeAll(Collection)} calls made from one of the tasks. + * This is to prevent deadlock with certain types of pool based executors (e.g. {@link + * java.util.concurrent.ThreadPoolExecutor}). */ class TaskExecutor { + // a static thread local is ok as long as we use a counter, which accounts for multiple + // searchers holding a different TaskExecutor all backed by the same executor + private static final ThreadLocal numberOfRunningTasksInCurrentThread = + ThreadLocal.withInitial(() -> 0); + private final Executor executor; TaskExecutor(Executor executor) { @@ -48,10 +61,17 @@ class TaskExecutor { * @return a list containing the results from the tasks execution * @param the return type of the task execution */ - final List invokeAll(Collection> tasks) throws IOException { - for (Runnable task : tasks) { - executor.execute(task); + final List invokeAll(Collection> tasks) throws IOException { + if (numberOfRunningTasksInCurrentThread.get() > 0) { + for (Task task : tasks) { + task.run(); + } + } else { + for (Runnable task : tasks) { + executor.execute(task); + } } + final List results = new ArrayList<>(); for (Future future : tasks) { try { @@ -64,4 +84,26 @@ class TaskExecutor { } return results; } + + final Task createTask(Callable callable) { + return new Task<>(callable); + } + + static class Task extends FutureTask { + private Task(Callable callable) { + super(callable); + } + + @Override + public void run() { + try { + Integer counter = numberOfRunningTasksInCurrentThread.get(); + numberOfRunningTasksInCurrentThread.set(counter + 1); + super.run(); + } finally { + Integer counter = numberOfRunningTasksInCurrentThread.get(); + numberOfRunningTasksInCurrentThread.set(counter - 1); + } + } + } } diff --git a/lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java b/lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java index 09784e1e8f3..51183214728 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java @@ -17,10 +17,16 @@ package org.apache.lucene.search; import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.NamedThreadFactory; import org.junit.AfterClass; @@ -44,8 +50,8 @@ public class TestTaskExecutor extends LuceneTestCase { public void testUnwrapIOExceptionFromExecutionException() { TaskExecutor taskExecutor = new TaskExecutor(executorService); - FutureTask task = - new FutureTask<>( + TaskExecutor.Task task = + taskExecutor.createTask( () -> { throw new IOException("io exception"); }); @@ -57,8 +63,8 @@ public class TestTaskExecutor extends LuceneTestCase { public void testUnwrapRuntimeExceptionFromExecutionException() { TaskExecutor taskExecutor = new TaskExecutor(executorService); - FutureTask task = - new FutureTask<>( + TaskExecutor.Task task = + taskExecutor.createTask( () -> { throw new RuntimeException("runtime"); }); @@ -71,8 +77,8 @@ public class TestTaskExecutor extends LuceneTestCase { public void testUnwrapErrorFromExecutionException() { TaskExecutor taskExecutor = new TaskExecutor(executorService); - FutureTask task = - new FutureTask<>( + TaskExecutor.Task task = + taskExecutor.createTask( () -> { throw new OutOfMemoryError("oom"); }); @@ -85,8 +91,8 @@ public class TestTaskExecutor extends LuceneTestCase { public void testUnwrappedExceptions() { TaskExecutor taskExecutor = new TaskExecutor(executorService); - FutureTask task = - new FutureTask<>( + TaskExecutor.Task task = + taskExecutor.createTask( () -> { throw new Exception("exc"); }); @@ -95,4 +101,125 @@ public class TestTaskExecutor extends LuceneTestCase { RuntimeException.class, () -> taskExecutor.invokeAll(Collections.singletonList(task))); assertEquals("exc", runtimeException.getCause().getMessage()); } + + public void testInvokeAllFromTaskDoesNotDeadlockSameSearcher() throws IOException { + try (Directory dir = newDirectory(); + RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) { + for (int i = 0; i < 500; i++) { + iw.addDocument(new Document()); + } + try (DirectoryReader reader = iw.getReader()) { + IndexSearcher searcher = + new IndexSearcher(reader, executorService) { + @Override + protected LeafSlice[] slices(List leaves) { + return slices(leaves, 1, 1); + } + }; + + searcher.search( + new MatchAllDocsQuery(), + new CollectorManager() { + @Override + public Collector newCollector() { + return new Collector() { + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) { + return new LeafCollector() { + @Override + public void setScorer(Scorable scorer) throws IOException { + TaskExecutor.Task task = + searcher + .getTaskExecutor() + .createTask( + () -> { + // make sure that we don't miss disabling concurrency one + // level deeper + TaskExecutor.Task anotherTask = + searcher.getTaskExecutor().createTask(() -> null); + searcher + .getTaskExecutor() + .invokeAll(Collections.singletonList(anotherTask)); + return null; + }); + searcher.getTaskExecutor().invokeAll(Collections.singletonList(task)); + } + + @Override + public void collect(int doc) {} + }; + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + } + + @Override + public Void reduce(Collection collectors) { + return null; + } + }); + } + } + } + + public void testInvokeAllFromTaskDoesNotDeadlockMultipleSearchers() throws IOException { + try (Directory dir = newDirectory(); + RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) { + for (int i = 0; i < 500; i++) { + iw.addDocument(new Document()); + } + try (DirectoryReader reader = iw.getReader()) { + IndexSearcher searcher = + new IndexSearcher(reader, executorService) { + @Override + protected LeafSlice[] slices(List leaves) { + return slices(leaves, 1, 1); + } + }; + + searcher.search( + new MatchAllDocsQuery(), + new CollectorManager() { + @Override + public Collector newCollector() { + return new Collector() { + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) { + return new LeafCollector() { + @Override + public void setScorer(Scorable scorer) throws IOException { + // the thread local used to prevent deadlock is static, so while each + // searcher has its own + // TaskExecutor, the safeguard is shared among all the searchers that get + // the same executor + IndexSearcher indexSearcher = new IndexSearcher(reader, executorService); + TaskExecutor.Task task = + indexSearcher.getTaskExecutor().createTask(() -> null); + searcher.getTaskExecutor().invokeAll(Collections.singletonList(task)); + } + + @Override + public void collect(int doc) {} + }; + } + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; + } + }; + } + + @Override + public Void reduce(Collection collectors) { + return null; + } + }); + } + } + } }