mirror of https://github.com/apache/lucene.git
LUCENE-2494: use CompletionService in ParallelMultiSearcher instead of simple polling
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@953407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
608f816434
commit
db1cb0bcb5
|
@ -763,6 +763,8 @@ API Changes
|
|||
(Robert Muir)
|
||||
|
||||
Optimizations
|
||||
* LUCENE-2494: Use CompletionService in ParallelMultiSearcher instead of
|
||||
simple polling for resutls. (Edward Drapkin, Simon Willnauer)
|
||||
|
||||
* LUCENE-2086: When resolving deleted terms, do so in term sort order
|
||||
for better performance (Bogdan Ghidireac via Mike McCandless)
|
||||
|
|
|
@ -18,16 +18,17 @@ package org.apache.lucene.search;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -67,18 +68,20 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
*/
|
||||
@Override
|
||||
public int docFreq(final Term term) throws IOException {
|
||||
@SuppressWarnings("unchecked") final Future<Integer>[] searchThreads = new Future[searchables.length];
|
||||
for (int i = 0; i < searchables.length; i++) { // search each searchable
|
||||
final ExecutionHelper<Integer> runner = new ExecutionHelper<Integer>(executor);
|
||||
for(int i = 0; i < searchables.length; i++) {
|
||||
final Searchable searchable = searchables[i];
|
||||
searchThreads[i] = executor.submit(new Callable<Integer>() {
|
||||
runner.submit(new Callable<Integer>() {
|
||||
public Integer call() throws IOException {
|
||||
return Integer.valueOf(searchable.docFreq(term));
|
||||
}
|
||||
});
|
||||
}
|
||||
final CountDocFreq func = new CountDocFreq();
|
||||
foreach(func, Arrays.asList(searchThreads));
|
||||
return func.docFreq;
|
||||
int docFreq = 0;
|
||||
for (Integer num : runner) {
|
||||
docFreq += num.intValue();
|
||||
}
|
||||
return docFreq;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -90,20 +93,25 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
|
||||
final HitQueue hq = new HitQueue(nDocs, false);
|
||||
final Lock lock = new ReentrantLock();
|
||||
@SuppressWarnings("unchecked") final Future<TopDocs>[] searchThreads = new Future[searchables.length];
|
||||
final ExecutionHelper<TopDocs> runner = new ExecutionHelper<TopDocs>(executor);
|
||||
|
||||
for (int i = 0; i < searchables.length; i++) { // search each searchable
|
||||
searchThreads[i] = executor.submit(
|
||||
runner.submit(
|
||||
new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq, i, starts));
|
||||
}
|
||||
|
||||
final CountTotalHits<TopDocs> func = new CountTotalHits<TopDocs>();
|
||||
foreach(func, Arrays.asList(searchThreads));
|
||||
int totalHits = 0;
|
||||
float maxScore = Float.NEGATIVE_INFINITY;
|
||||
for (final TopDocs topDocs : runner) {
|
||||
totalHits += topDocs.totalHits;
|
||||
maxScore = Math.max(maxScore, topDocs.getMaxScore());
|
||||
}
|
||||
|
||||
final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
|
||||
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
|
||||
scoreDocs[i] = hq.pop();
|
||||
|
||||
return new TopDocs(func.totalHits, scoreDocs, func.maxScore);
|
||||
return new TopDocs(totalHits, scoreDocs, maxScore);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,20 +125,22 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
|
||||
final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs);
|
||||
final Lock lock = new ReentrantLock();
|
||||
@SuppressWarnings("unchecked") final Future<TopFieldDocs>[] searchThreads = new Future[searchables.length];
|
||||
final ExecutionHelper<TopFieldDocs> runner = new ExecutionHelper<TopFieldDocs>(executor);
|
||||
for (int i = 0; i < searchables.length; i++) { // search each searchable
|
||||
searchThreads[i] = executor.submit(
|
||||
runner.submit(
|
||||
new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs, hq, sort, i, starts));
|
||||
}
|
||||
|
||||
final CountTotalHits<TopFieldDocs> func = new CountTotalHits<TopFieldDocs>();
|
||||
foreach(func, Arrays.asList(searchThreads));
|
||||
|
||||
int totalHits = 0;
|
||||
float maxScore = Float.NEGATIVE_INFINITY;
|
||||
for (final TopFieldDocs topFieldDocs : runner) {
|
||||
totalHits += topFieldDocs.totalHits;
|
||||
maxScore = Math.max(maxScore, topFieldDocs.getMaxScore());
|
||||
}
|
||||
final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
|
||||
for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
|
||||
scoreDocs[i] = hq.pop();
|
||||
|
||||
return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore);
|
||||
return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
|
||||
}
|
||||
|
||||
/** Lower-level search API.
|
||||
|
@ -192,13 +202,17 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
HashMap<Term, Integer> createDocFrequencyMap(Set<Term> terms) throws IOException {
|
||||
final Term[] allTermsArray = terms.toArray(new Term[terms.size()]);
|
||||
final int[] aggregatedDocFreqs = new int[terms.size()];
|
||||
final ArrayList<Future<int[]>> searchThreads = new ArrayList<Future<int[]>>(searchables.length);
|
||||
final ExecutionHelper<int[]> runner = new ExecutionHelper<int[]>(executor);
|
||||
for (Searchable searchable : searchables) {
|
||||
final Future<int[]> future = executor.submit(
|
||||
runner.submit(
|
||||
new DocumentFrequencyCallable(searchable, allTermsArray));
|
||||
searchThreads.add(future);
|
||||
}
|
||||
foreach(new AggregateDocFrequency(aggregatedDocFreqs), searchThreads);
|
||||
final int docFreqLen = aggregatedDocFreqs.length;
|
||||
for (final int[] docFreqs : runner) {
|
||||
for(int i=0; i < docFreqLen; i++){
|
||||
aggregatedDocFreqs[i] += docFreqs[i];
|
||||
}
|
||||
}
|
||||
|
||||
final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
|
||||
for(int i=0; i<allTermsArray.length; i++) {
|
||||
|
@ -206,77 +220,7 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
}
|
||||
return dfMap;
|
||||
}
|
||||
|
||||
/*
|
||||
* apply the function to each element of the list. This method encapsulates the logic how
|
||||
* to wait for concurrently executed searchables.
|
||||
*/
|
||||
private <T> void foreach(Function<T> func, List<Future<T>> list) throws IOException{
|
||||
for (Future<T> future : list) {
|
||||
try{
|
||||
func.apply(future.get());
|
||||
} catch (ExecutionException e) {
|
||||
final Throwable throwable = e.getCause();
|
||||
if (throwable instanceof IOException)
|
||||
throw (IOException) e.getCause();
|
||||
throw new RuntimeException(throwable);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Both functions could be reduced to Int as other values of TopDocs
|
||||
// are not needed. Using sep. functions is more self documenting.
|
||||
/**
|
||||
* A function with one argument
|
||||
* @param <T> the argument type
|
||||
*/
|
||||
private static interface Function<T> {
|
||||
abstract void apply(T t);
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts the total number of hits for all {@link TopDocs} instances
|
||||
* provided.
|
||||
*/
|
||||
private static final class CountTotalHits<T extends TopDocs> implements Function<T> {
|
||||
int totalHits = 0;
|
||||
float maxScore = Float.NEGATIVE_INFINITY;
|
||||
|
||||
public void apply(T t) {
|
||||
totalHits += t.totalHits;
|
||||
maxScore = Math.max(maxScore, t.getMaxScore());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Accumulates the document frequency for a term.
|
||||
*/
|
||||
private static final class CountDocFreq implements Function<Integer>{
|
||||
int docFreq = 0;
|
||||
|
||||
public void apply(Integer t) {
|
||||
docFreq += t.intValue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregates the document frequencies from multiple searchers
|
||||
*/
|
||||
private static final class AggregateDocFrequency implements Function<int[]>{
|
||||
final int[] aggregatedDocFreqs;
|
||||
|
||||
public AggregateDocFrequency(int[] aggregatedDocFreqs){
|
||||
this.aggregatedDocFreqs = aggregatedDocFreqs;
|
||||
}
|
||||
|
||||
public void apply(final int[] docFreqs) {
|
||||
for(int i=0; i<aggregatedDocFreqs.length; i++){
|
||||
aggregatedDocFreqs[i] += docFreqs[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link Callable} to retrieve the document frequencies for a Term array
|
||||
|
@ -294,4 +238,53 @@ public class ParallelMultiSearcher extends MultiSearcher {
|
|||
return searchable.docFreqs(terms);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper class that wraps a {@link CompletionService} and provides an
|
||||
* iterable interface to the completed {@link Callable} instances.
|
||||
*
|
||||
* @param <T>
|
||||
* the type of the {@link Callable} return value
|
||||
*/
|
||||
private static final class ExecutionHelper<T> implements Iterator<T>, Iterable<T> {
|
||||
private final CompletionService<T> service;
|
||||
private int numTasks;
|
||||
|
||||
ExecutionHelper(final Executor executor) {
|
||||
this.service = new ExecutorCompletionService<T>(executor);
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
return numTasks > 0;
|
||||
}
|
||||
|
||||
public void submit(Callable<T> task) {
|
||||
this.service.submit(task);
|
||||
++numTasks;
|
||||
}
|
||||
|
||||
public T next() {
|
||||
if(!this.hasNext())
|
||||
throw new NoSuchElementException();
|
||||
try {
|
||||
return service.take().get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new ThreadInterruptedException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
--numTasks;
|
||||
}
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public Iterator<T> iterator() {
|
||||
// use the shortcut here - this is only used in a privat context
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue