Simplify task executor for concurrent operations (#12499)

This commit removes the QueueSizeBasedExecutor (package private) in favour of simply offloading concurrent execution to the provided executor. In need of specific behaviour, it can all be included in the executor itself.

This removes an instanceof check that determines which type of executor wrapper is used, which means that some tasks may be executed on the caller thread depending on queue size, whenever a rejection happens, or always for the last slice. This behaviour is not configurable in any way, and is too rigid. Rather than making this pluggable, I propose to make Lucene less opinionated about concurrent tasks execution and require that users include their own execution strategy directly in the executor that they provide to the index searcher.

Relates to #12498
This commit is contained in:
Luca Cavanna 2023-08-21 21:54:37 +02:00 committed by GitHub
parent fb8183332b
commit bb62720526
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 15 additions and 274 deletions

View File

@ -155,6 +155,9 @@ Improvements
---------------------
* GITHUB#12374: Add CachingLeafSlicesSupplier to compute the LeafSlices for concurrent segment search (Sorabh Hamirwasia)
* GITHUB#12499: Simplify task executor for concurrent operations by offloading concurrent operations to the
provided executor unconditionally. (Luca Cavanna)
Optimizations
---------------------

View File

@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.lucene.index.DirectoryReader;
@ -226,18 +225,11 @@ public class IndexSearcher {
* @lucene.experimental
*/
public IndexSearcher(IndexReaderContext context, Executor executor) {
this(context, executor, getSliceExecutionControlPlane(executor));
}
// Package private for testing
IndexSearcher(IndexReaderContext context, Executor executor, TaskExecutor taskExecutor) {
assert context.isTopLevel
: "IndexSearcher's ReaderContext must be topLevel for reader" + context.reader();
assert (taskExecutor == null) == (executor == null);
reader = context.reader();
this.executor = executor;
this.taskExecutor = taskExecutor;
this.taskExecutor = executor == null ? null : new TaskExecutor(executor);
this.readerContext = context;
leafContexts = context.leaves();
leafSlicesSupplier =
@ -1002,19 +994,6 @@ public class IndexSearcher {
}
}
/** Return the SliceExecutionControlPlane instance to be used for this IndexSearcher instance */
private static TaskExecutor getSliceExecutionControlPlane(Executor executor) {
if (executor == null) {
return null;
}
if (executor instanceof ThreadPoolExecutor) {
return new QueueSizeBasedExecutor((ThreadPoolExecutor) executor);
}
return new TaskExecutor(executor);
}
/**
* Supplier for {@link LeafSlice} slices which computes and caches the value on first invocation
* and returns cached value on subsequent invocation. If the passed in provider for slice

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Derivative of TaskExecutor that controls the number of active threads that are used for a single
* query. At any point, no more than (maximum pool size of the executor * LIMITING_FACTOR) tasks
* should be active. If the limit is exceeded, further segments are searched on the caller thread
*/
class QueueSizeBasedExecutor extends TaskExecutor {
private static final double LIMITING_FACTOR = 1.5;
private final ThreadPoolExecutor threadPoolExecutor;
QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
super(threadPoolExecutor);
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
return super.shouldExecuteOnCallerThread(index, numTasks)
|| threadPoolExecutor.getQueue().size()
>= (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR);
}
}

View File

@ -24,7 +24,6 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import org.apache.lucene.util.ThreadInterruptedException;
@ -39,21 +38,17 @@ class TaskExecutor {
this.executor = Objects.requireNonNull(executor, "Executor is null");
}
/**
* Execute all the tasks provided as an argument, wait for them to complete and return the
* obtained results.
*
* @param tasks the tasks to execute
* @return a list containing the results from the tasks execution
* @param <T> the return type of the task execution
*/
final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) {
int i = 0;
for (Runnable task : tasks) {
if (shouldExecuteOnCallerThread(i, tasks.size())) {
task.run();
} else {
try {
executor.execute(task);
} catch (
@SuppressWarnings("unused")
RejectedExecutionException e) {
task.run();
}
}
++i;
}
final List<T> results = new ArrayList<>();
for (Future<T> future : tasks) {
@ -67,9 +62,4 @@ class TaskExecutor {
}
return results;
}
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
// Execute last task on caller thread
return index == numTasks - 1;
}
}

View File

@ -19,19 +19,12 @@ package org.apache.lucene.search;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@ -255,7 +248,7 @@ public class TestIndexSearcher extends LuceneTestCase {
IOUtils.close(r, dir);
}
public void testOneSegmentExecutesOnTheCallerThread() throws IOException {
public void testMultipleSegmentsExecuteOnTheExecutor() throws IOException {
List<LeafReaderContext> leaves = reader.leaves();
AtomicInteger numExecutions = new AtomicInteger(0);
IndexSearcher searcher =
@ -278,188 +271,7 @@ public class TestIndexSearcher extends LuceneTestCase {
if (leaves.size() <= 1) {
assertEquals(0, numExecutions.get());
} else {
assertEquals(leaves.size() - 1, numExecutions.get());
}
}
public void testRejectedExecution() throws IOException {
ExecutorService service = new RejectingMockExecutor();
IndexSearcher searcher =
new IndexSearcher(reader, service) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
ArrayList<LeafSlice> slices = new ArrayList<>();
for (LeafReaderContext ctx : leaves) {
slices.add(new LeafSlice(Arrays.asList(ctx)));
}
return slices.toArray(new LeafSlice[0]);
}
};
// To ensure that failing ExecutorService still allows query to run
// successfully
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10);
assert topDocs.scoreDocs.length == 10;
service.shutdown();
}
private static class RejectingMockExecutor implements ExecutorService {
@Override
public void shutdown() {}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
}
@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(final long l, final TimeUnit timeUnit)
throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(final Callable<T> tCallable) {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(final Runnable runnable, final T t) {
throw new UnsupportedOperationException();
}
@Override
public Future<?> submit(final Runnable runnable) {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables)
throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(
final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit)
throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(final Collection<? extends Callable<T>> callables)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(
final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
@Override
public void execute(final Runnable runnable) {
throw new RejectedExecutionException();
}
}
public void testQueueSizeBasedSliceExecutor() throws Exception {
ThreadPoolExecutor service =
new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestIndexSearcher"));
runSliceExecutorTest(service, false);
TestUtil.shutdownExecutorService(service);
}
public void testRandomBlockingSliceExecutor() throws Exception {
ThreadPoolExecutor service =
new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestIndexSearcher"));
runSliceExecutorTest(service, true);
TestUtil.shutdownExecutorService(service);
}
private void runSliceExecutorTest(ThreadPoolExecutor service, boolean useRandomSliceExecutor)
throws Exception {
TaskExecutor taskExecutor =
useRandomSliceExecutor == true
? new RandomBlockingTaskExecutor(service)
: new QueueSizeBasedExecutor(service);
IndexSearcher searcher = new IndexSearcher(reader.getContext(), service, taskExecutor);
Query[] queries = new Query[] {new MatchAllDocsQuery(), new TermQuery(new Term("field", "1"))};
Sort[] sorts = new Sort[] {null, new Sort(new SortField("field2", SortField.Type.STRING))};
ScoreDoc[] afters =
new ScoreDoc[] {null, new FieldDoc(0, 0f, new Object[] {newBytesRef("boo!")})};
for (ScoreDoc after : afters) {
for (Query query : queries) {
for (Sort sort : sorts) {
searcher.search(query, Integer.MAX_VALUE);
searcher.searchAfter(after, query, Integer.MAX_VALUE);
if (sort != null) {
TopDocs topDocs = searcher.search(query, Integer.MAX_VALUE, sort);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.search(query, Integer.MAX_VALUE, sort, true);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.search(query, Integer.MAX_VALUE, sort, false);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, true);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, false);
assertTrue(topDocs.totalHits.value > 0);
}
}
}
}
}
private static class RandomBlockingTaskExecutor extends TaskExecutor {
RandomBlockingTaskExecutor(Executor executor) {
super(executor);
}
@Override
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
return random().nextBoolean();
assertEquals(leaves.size(), numExecutions.get());
}
}
}