Revert "Parallelize knn query rewrite across slices rather than segments (#12325)" (#12385)

This reverts commit 10bebde269.

Based on a recent discussion in
https://github.com/apache/lucene/pull/12183#discussion_r1235739084 we
agreed it makes more sense to parallelize knn query vector rewrite
across leaves rather than leaf slices.
This commit is contained in:
Luca Cavanna 2023-06-26 10:41:18 +02:00 committed by GitHub
parent cb195bd96e
commit 7f10dca1e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 40 deletions

View File

@ -19,12 +19,12 @@ package org.apache.lucene.search;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.index.FieldInfo;
@ -81,12 +81,11 @@ abstract class AbstractKnnVectorQuery extends Query {
filterWeight = null;
}
SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
// in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
Executor executor = indexSearcher.getExecutor();
TopDocs[] perLeafResults =
(sliceExecutor == null)
(executor == null)
? sequentialSearch(reader.leaves(), filterWeight)
: parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
: parallelSearch(reader.leaves(), filterWeight, executor);
// Merge sort the results
TopDocs topK = TopDocs.merge(k, perLeafResults);
@ -110,40 +109,27 @@ abstract class AbstractKnnVectorQuery extends Query {
}
private TopDocs[] parallelSearch(
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {
List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
int segmentsCount = 0;
for (IndexSearcher.LeafSlice slice : slices) {
segmentsCount += slice.leaves.length;
tasks.add(
new FutureTask<>(
() -> {
TopDocs[] results = new TopDocs[slice.leaves.length];
int i = 0;
for (LeafReaderContext context : slice.leaves) {
results[i++] = searchLeaf(context, filterWeight);
}
return results;
}));
}
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
List<FutureTask<TopDocs>> tasks =
leafReaderContexts.stream()
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
.toList();
SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);
TopDocs[] topDocs = new TopDocs[segmentsCount];
int i = 0;
for (FutureTask<TopDocs[]> task : tasks) {
try {
for (TopDocs docs : task.get()) {
topDocs[i++] = docs;
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
return topDocs;
return tasks.stream()
.map(
task -> {
try {
return task.get();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
})
.toArray(TopDocs[]::new);
}
private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {

View File

@ -962,10 +962,6 @@ public class IndexSearcher {
return executor;
}
SliceExecutor getSliceExecutor() {
return sliceExecutor;
}
/**
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to