LUCENE-2128: Further parallelization of ParallelMultiSearcher

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@888595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uwe Schindler 2009-12-08 22:14:32 +00:00
parent 6702e09b98
commit 4e88c6cb93
4 changed files with 114 additions and 24 deletions

View File

@ -45,6 +45,9 @@ Bug fixes
New features
* LUCENE-2128: Parallelized fetching document frequencies during weight
creation. (Israel Tsadok, Simon Willnauer via Uwe Schindler)
* LUCENE-2069: Added Unicode 4 support to CharArraySet. Due to the switch
to Java 5, supplementary characters are now lowercased correctly if the
set is created as case insensitive.

View File

@ -311,20 +311,7 @@ public class MultiSearcher extends Searcher {
rewrittenQuery.extractTerms(terms);
// step3
final Term[] allTermsArray = new Term[terms.size()];
terms.toArray(allTermsArray);
int[] aggregatedDfs = new int[terms.size()];
for (int i = 0; i < searchables.length; i++) {
int[] dfs = searchables[i].docFreqs(allTermsArray);
for(int j=0; j<aggregatedDfs.length; j++){
aggregatedDfs[j] += dfs[j];
}
}
final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
for(int i=0; i<allTermsArray.length; i++) {
dfMap.put(allTermsArray[i], Integer.valueOf(aggregatedDfs[i]));
}
final Map<Term,Integer> dfMap = createDocFrequencyMap(terms);
// step4
final int numDocs = maxDoc();
@ -332,11 +319,34 @@ public class MultiSearcher extends Searcher {
return rewrittenQuery.weight(cacheSim);
}
/**
* Collects the document frequency for the given terms form all searchables
* @param terms term set used to collect the document frequency form all
* searchables
* @return a map with a term as the key and the terms aggregated document
* frequency as a value
* @throws IOException if a searchable throws an {@link IOException}
*/
Map<Term, Integer> createDocFrequencyMap(final Set<Term> terms) throws IOException {
final Term[] allTermsArray = terms.toArray(new Term[terms.size()]);
final int[] aggregatedDfs = new int[allTermsArray.length];
for (Searchable searchable : searchables) {
final int[] dfs = searchable.docFreqs(allTermsArray);
for(int j=0; j<aggregatedDfs.length; j++){
aggregatedDfs[j] += dfs[j];
}
}
final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
for(int i=0; i<allTermsArray.length; i++) {
dfMap.put(allTermsArray[i], Integer.valueOf(aggregatedDfs[i]));
}
return dfMap;
}
/**
* A thread subclass for searching a single searchable
*/
static class MultiSearcherCallableNoSort implements Callable<TopDocs> {
static final class MultiSearcherCallableNoSort implements Callable<TopDocs> {
private final Lock lock;
private final Searchable searchable;
@ -381,7 +391,7 @@ public class MultiSearcher extends Searcher {
/**
* A thread subclass for searching a single searchable
*/
static class MultiSearcherCallableWithSort implements Callable<TopFieldDocs> {
static final class MultiSearcherCallableWithSort implements Callable<TopFieldDocs> {
private final Lock lock;
private final Searchable searchable;

View File

@ -18,21 +18,22 @@ package org.apache.lucene.search;
*/
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.ThreadInterruptedException;
/** Implements parallel search over a set of <code>Searchables</code>.
@ -175,6 +176,25 @@ public class ParallelMultiSearcher extends MultiSearcher {
}
}
@Override
HashMap<Term, Integer> createDocFrequencyMap(Set<Term> terms) throws IOException {
final Term[] allTermsArray = terms.toArray(new Term[terms.size()]);
final int[] aggregatedDocFreqs = new int[terms.size()];
final ArrayList<Future<int[]>> searchThreads = new ArrayList<Future<int[]>>(searchables.length);
for (Searchable searchable : searchables) {
final Future<int[]> future = executor.submit(
new DocumentFrequencyCallable(searchable, allTermsArray));
searchThreads.add(future);
}
foreach(new AggregateDocFrequency(aggregatedDocFreqs), searchThreads);
final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
for(int i=0; i<allTermsArray.length; i++) {
dfMap.put(allTermsArray[i], Integer.valueOf(aggregatedDocFreqs[i]));
}
return dfMap;
}
/*
* apply the function to each element of the list. This method encapsulates the logic how
* to wait for concurrently executed searchables.
@ -184,9 +204,10 @@ public class ParallelMultiSearcher extends MultiSearcher {
try{
func.apply(future.get());
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException)
final Throwable throwable = e.getCause();
if (throwable instanceof IOException)
throw (IOException) e.getCause();
throw new RuntimeException(e.getCause());
throw new RuntimeException(throwable);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
@ -216,6 +237,7 @@ public class ParallelMultiSearcher extends MultiSearcher {
maxScore = Math.max(maxScore, t.getMaxScore());
}
}
/**
* Accumulates the document frequency for a term.
*/
@ -227,4 +249,37 @@ public class ParallelMultiSearcher extends MultiSearcher {
}
}
/**
* Aggregates the document frequencies from multiple searchers
*/
private static final class AggregateDocFrequency implements Function<int[]>{
final int[] aggregatedDocFreqs;
public AggregateDocFrequency(int[] aggregatedDocFreqs){
this.aggregatedDocFreqs = aggregatedDocFreqs;
}
public void apply(final int[] docFreqs) {
for(int i=0; i<aggregatedDocFreqs.length; i++){
aggregatedDocFreqs[i] += docFreqs[i];
}
}
}
/**
* A {@link Callable} to retrieve the document frequencies for a Term array
*/
private static final class DocumentFrequencyCallable implements Callable<int[]> {
private final Searchable searchable;
private final Term[] terms;
public DocumentFrequencyCallable(Searchable searchable, Term[] terms) {
this.searchable = searchable;
this.terms = terms;
}
public int[] call() throws Exception {
return searchable.docFreqs(terms);
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.util.Version;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@ -411,4 +412,25 @@ public class TestMultiSearcher extends LuceneTestCase
MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2});
assertEquals(15, multiSearcher.docFreq(new Term("contents","x")));
}
public void testCreateDocFrequencyMap() throws IOException{
RAMDirectory dir1 = new RAMDirectory();
RAMDirectory dir2 = new RAMDirectory();
Term template = new Term("contents") ;
String[] contents = {"a", "b", "c"};
HashSet<Term> termsSet = new HashSet<Term>();
for (int i = 0; i < contents.length; i++) {
initIndex(dir1, i+10, i==0, contents[i]);
initIndex(dir2, i+5, i==0, contents[i]);
termsSet.add(template.createTerm(contents[i]));
}
IndexSearcher searcher1 = new IndexSearcher(dir1, true);
IndexSearcher searcher2 = new IndexSearcher(dir2, true);
MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2});
Map<Term,Integer> docFrequencyMap = multiSearcher.createDocFrequencyMap(termsSet);
assertEquals(3, docFrequencyMap.size());
for (int i = 0; i < contents.length; i++) {
assertEquals(Integer.valueOf((i*2) +15), docFrequencyMap.get(template.createTerm(contents[i])));
}
}
}