LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)

This commit introduces a mechanism to control allocation of threads to slices planned for a query.
The default implementation uses the size of backlog queue of the executor to determine if a slice should be allocated a new thread
This commit is contained in:
Atri Sharma 2020-04-01 20:42:26 +05:30 committed by GitHub
parent 1f5705ff5c
commit 9ed71a6efe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 267 additions and 25 deletions

View File

@ -26,12 +26,11 @@ import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@ -123,6 +122,9 @@ public class IndexSearcher {
// These are only used for multi-threaded search
private final Executor executor;
// Used internally for load balancing threads executing for the query
private final SliceExecutor sliceExecutor;
// the default Similarity
private static final Similarity defaultSimilarity = new BM25Similarity();
@ -208,9 +210,17 @@ public class IndexSearcher {
* @lucene.experimental
*/
public IndexSearcher(IndexReaderContext context, Executor executor) {
this(context, executor, getSliceExecutionControlPlane(executor));
}
// Package private for testing
IndexSearcher(IndexReaderContext context, Executor executor, SliceExecutor sliceExecutor) {
assert context.isTopLevel: "IndexSearcher's ReaderContext must be topLevel for reader" + context.reader();
assert (sliceExecutor == null) == (executor==null);
reader = context.reader();
this.executor = executor;
this.sliceExecutor = sliceExecutor;
this.readerContext = context;
leafContexts = context.leaves();
this.leafSlices = executor == null ? null : slices(leafContexts);
@ -662,36 +672,21 @@ public class IndexSearcher {
}
query = rewrite(query);
final Weight weight = createWeight(query, scoreMode, 1);
final List<Future<C>> topDocsFutures = new ArrayList<>(leafSlices.length);
for (int i = 0; i < leafSlices.length - 1; ++i) {
final List<FutureTask<C>> listTasks = new ArrayList<>();
for (int i = 0; i < leafSlices.length; ++i) {
final LeafReaderContext[] leaves = leafSlices[i].leaves;
final C collector = collectors.get(i);
FutureTask<C> task = new FutureTask<>(() -> {
search(Arrays.asList(leaves), weight, collector);
return collector;
});
boolean executedOnCallerThread = false;
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
// Execute on caller thread
search(Arrays.asList(leaves), weight, collector);
topDocsFutures.add(CompletableFuture.completedFuture(collector));
executedOnCallerThread = true;
}
// Do not add the task's future if it was not used
if (executedOnCallerThread == false) {
topDocsFutures.add(task);
}
listTasks.add(task);
}
final LeafReaderContext[] leaves = leafSlices[leafSlices.length - 1].leaves;
final C collector = collectors.get(leafSlices.length - 1);
// execute the last on the caller thread
search(Arrays.asList(leaves), weight, collector);
topDocsFutures.add(CompletableFuture.completedFuture(collector));
sliceExecutor.invokeAll(listTasks);
final List<C> collectedCollectors = new ArrayList<>();
for (Future<C> future : topDocsFutures) {
for (Future<C> future : listTasks) {
try {
collectedCollectors.add(future.get());
} catch (InterruptedException e) {
@ -878,7 +873,7 @@ public class IndexSearcher {
@Override
public String toString() {
return "IndexSearcher(" + reader + "; executor=" + executor + ")";
return "IndexSearcher(" + reader + "; executor=" + executor + "; sliceExecutionControlPlane " + sliceExecutor + ")";
}
/**
@ -943,4 +938,19 @@ public class IndexSearcher {
super("maxClauseCount is set to " + maxClauseCount);
}
}
/**
* Return the SliceExecutionControlPlane instance to be used for this IndexSearcher instance
*/
private static SliceExecutor getSliceExecutionControlPlane(Executor executor) {
if (executor == null) {
return null;
}
if (executor instanceof ThreadPoolExecutor) {
return new QueueSizeBasedExecutor((ThreadPoolExecutor) executor);
}
return new SliceExecutor(executor);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.util.Collection;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Derivative of SliceExecutor that controls the number of active threads
* that are used for a single query. At any point, no more than (maximum pool size of the executor * LIMITING_FACTOR)
* tasks should be active. If the limit is exceeded, further segments are searched on the caller thread
*/
class QueueSizeBasedExecutor extends SliceExecutor {
private static final double LIMITING_FACTOR = 1.5;
private final ThreadPoolExecutor threadPoolExecutor;
public QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
super(threadPoolExecutor);
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public void invokeAll(Collection<? extends Runnable> tasks) {
int i = 0;
for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = false;
// Execute last task on caller thread
if (i == tasks.size() - 1) {
shouldExecuteOnCallerThread = true;
}
if (threadPoolExecutor.getQueue().size() >=
(threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR)) {
shouldExecuteOnCallerThread = true;
}
processTask(task, shouldExecuteOnCallerThread);
++i;
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
/**
* Executor which is responsible
* for execution of slices based on the current status
* of the system and current system load
*/
class SliceExecutor {
private final Executor executor;
public SliceExecutor(Executor executor) {
this.executor = executor;
}
public void invokeAll(Collection<? extends Runnable> tasks) {
if (tasks == null) {
throw new IllegalArgumentException("Tasks is null");
}
if (executor == null) {
throw new IllegalArgumentException("Executor is null");
}
int i = 0;
for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = false;
// Execute last task on caller thread
if (i == tasks.size() - 1) {
shouldExecuteOnCallerThread = true;
}
processTask(task, shouldExecuteOnCallerThread);
++i;
};
}
// Helper method to execute a single task
protected void processTask(final Runnable task,
final boolean shouldExecuteOnCallerThread) {
if (task == null) {
throw new IllegalArgumentException("Input is null");
}
if (!shouldExecuteOnCallerThread) {
try {
executor.execute(task);
return;
} catch (RejectedExecutionException e) {
// Execute on caller thread
}
}
task.run();
}
}

View File

@ -22,8 +22,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@ -34,8 +36,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
@ -61,12 +63,17 @@ public class TestIndexSearcher extends LuceneTestCase {
super.setUp();
dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
Random random = random();
for (int i = 0; i < 100; i++) {
Document doc = new Document();
doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), Field.Store.NO));
doc.add(new SortedDocValuesField("field2", new BytesRef(Boolean.toString(i % 2 == 0))));
iw.addDocument(doc);
if (random.nextBoolean()) {
iw.commit();
}
}
reader = iw.getReader();
iw.close();
@ -347,4 +354,89 @@ public class TestIndexSearcher extends LuceneTestCase {
throw new RejectedExecutionException();
}
}
public void testQueueSizeBasedSliceExecutor() throws Exception {
ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestIndexSearcher"));
runSliceExecutorTest(service, false);
TestUtil.shutdownExecutorService(service);
}
public void testRandomBlockingSliceExecutor() throws Exception {
ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestIndexSearcher"));
runSliceExecutorTest(service, true);
TestUtil.shutdownExecutorService(service);
}
private void runSliceExecutorTest(ThreadPoolExecutor service, boolean useRandomSliceExecutor) throws Exception {
SliceExecutor sliceExecutor = useRandomSliceExecutor == true ? new RandomBlockingSliceExecutor(service) :
new QueueSizeBasedExecutor(service);
IndexSearcher searcher = new IndexSearcher(reader.getContext(), service, sliceExecutor);
Query queries[] = new Query[] {
new MatchAllDocsQuery(),
new TermQuery(new Term("field", "1"))
};
Sort sorts[] = new Sort[] {
null,
new Sort(new SortField("field2", SortField.Type.STRING))
};
ScoreDoc afters[] = new ScoreDoc[] {
null,
new FieldDoc(0, 0f, new Object[] { new BytesRef("boo!") })
};
for (ScoreDoc after : afters) {
for (Query query : queries) {
for (Sort sort : sorts) {
searcher.search(query, Integer.MAX_VALUE);
searcher.searchAfter(after, query, Integer.MAX_VALUE);
if (sort != null) {
TopDocs topDocs = searcher.search(query, Integer.MAX_VALUE, sort);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.search(query, Integer.MAX_VALUE, sort, true);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.search(query, Integer.MAX_VALUE, sort, false);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, true);
assertTrue(topDocs.totalHits.value > 0);
topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, false);
assertTrue(topDocs.totalHits.value > 0);
}
}
}
}
}
private class RandomBlockingSliceExecutor extends SliceExecutor {
public RandomBlockingSliceExecutor(Executor executor) {
super(executor);
}
@Override
public void invokeAll(Collection<? extends Runnable> tasks){
for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = random().nextBoolean();
processTask(task, shouldExecuteOnCallerThread);
}
}
}
}