mirror of https://github.com/apache/lucene.git
LUCENE-6294: Generalize how IndexSearcher parallelizes collection execution.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1662751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f45083f67
commit
60ecddfb9e
|
@ -48,6 +48,9 @@ New Features
|
|||
* LUCENE-6227: Added BooleanClause.Occur.FILTER to filter documents without
|
||||
participating in scoring (on the contrary to MUST). (Adrien Grand)
|
||||
|
||||
* LUCENE-6294: Added oal.search.CollectorManager to allow for parallelization
|
||||
of the document collection process on IndexSearcher. (Adrien Grand)
|
||||
|
||||
Bug Fixes
|
||||
|
||||
* LUCENE-6249: StandardQueryParser doesn't support pure negative clauses.
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package org.apache.lucene.search;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* A manager of collectors. This class is useful to parallelize execution of
|
||||
* search requests and has two main methods:
|
||||
* <ul>
|
||||
* <li>{@link #newCollector()} which must return a NEW collector which
|
||||
* will be used to collect a certain set of leaves.</li>
|
||||
* <li>{@link #reduce(Collection)} which will be used to reduce the
|
||||
* results of individual collections into a meaningful result.
|
||||
* This method is only called after all leaves have been fully
|
||||
* collected.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @see IndexSearcher#search(Query, CollectorManager)
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public interface CollectorManager<C extends Collector, T> {
|
||||
|
||||
/**
|
||||
* Return a new {@link Collector}. This must return a different instance on
|
||||
* each call.
|
||||
*/
|
||||
C newCollector() throws IOException;
|
||||
|
||||
/**
|
||||
* Reduce the results of individual collectors into a meaningful result.
|
||||
* For instance a {@link TopDocsCollector} would compute the
|
||||
* {@link TopDocsCollector#topDocs() top docs} of each collector and then
|
||||
* merge them using {@link TopDocs#merge(int, TopDocs[])}.
|
||||
* This method must be called after collection is finished on all provided
|
||||
* collectors.
|
||||
*/
|
||||
T reduce(Collection<C> collectors) throws IOException;
|
||||
|
||||
}
|
|
@ -20,6 +20,8 @@ package org.apache.lucene.search;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -209,6 +211,30 @@ public class IndexSearcher {
|
|||
return similarity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Count how many documents match the given query.
|
||||
*/
|
||||
public int count(Query query) throws IOException {
|
||||
final CollectorManager<TotalHitCountCollector, Integer> collectorManager = new CollectorManager<TotalHitCountCollector, Integer>() {
|
||||
|
||||
@Override
|
||||
public TotalHitCountCollector newCollector() throws IOException {
|
||||
return new TotalHitCountCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer reduce(Collection<TotalHitCountCollector> collectors) throws IOException {
|
||||
int total = 0;
|
||||
for (TotalHitCountCollector collector : collectors) {
|
||||
total += collector.getTotalHits();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
};
|
||||
return search(query, collectorManager);
|
||||
}
|
||||
|
||||
/** Finds the top <code>n</code>
|
||||
* hits for <code>query</code> where all results are after a previous
|
||||
* result (<code>after</code>).
|
||||
|
@ -228,47 +254,30 @@ public class IndexSearcher {
|
|||
}
|
||||
numHits = Math.min(numHits, limit);
|
||||
|
||||
if (executor == null) {
|
||||
final TopScoreDocCollector collector = TopScoreDocCollector.create(numHits, after);
|
||||
search(query, collector);
|
||||
return collector.topDocs();
|
||||
} else {
|
||||
final TopScoreDocCollector[] collectors = new TopScoreDocCollector[leafSlices.length];
|
||||
boolean needsScores = false;
|
||||
for (int i = 0; i < leafSlices.length; ++i) {
|
||||
collectors[i] = TopScoreDocCollector.create(numHits, after);
|
||||
needsScores |= collectors[i].needsScores();
|
||||
final int cappedNumHits = Math.min(numHits, limit);
|
||||
|
||||
final CollectorManager<TopScoreDocCollector, TopDocs> manager = new CollectorManager<TopScoreDocCollector, TopDocs>() {
|
||||
|
||||
@Override
|
||||
public TopScoreDocCollector newCollector() throws IOException {
|
||||
return TopScoreDocCollector.create(cappedNumHits, after);
|
||||
}
|
||||
|
||||
final Weight weight = createNormalizedWeight(query, needsScores);
|
||||
final List<Future<TopDocs>> topDocsFutures = new ArrayList<>(leafSlices.length);
|
||||
for (int i = 0; i < leafSlices.length; ++i) {
|
||||
final LeafReaderContext[] leaves = leafSlices[i].leaves;
|
||||
final TopScoreDocCollector collector = collectors[i];
|
||||
topDocsFutures.add(executor.submit(new Callable<TopDocs>() {
|
||||
@Override
|
||||
public TopDocs call() throws Exception {
|
||||
search(Arrays.asList(leaves), weight, collector);
|
||||
return collector.topDocs();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
final TopDocs[] topDocs = new TopDocs[leafSlices.length];
|
||||
for (int i = 0; i < topDocs.length; ++i) {
|
||||
try {
|
||||
topDocs[i] = topDocsFutures.get(i).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new ThreadInterruptedException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
@Override
|
||||
public TopDocs reduce(Collection<TopScoreDocCollector> collectors) throws IOException {
|
||||
final TopDocs[] topDocs = new TopDocs[collectors.size()];
|
||||
int i = 0;
|
||||
for (TopScoreDocCollector collector : collectors) {
|
||||
topDocs[i++] = collector.topDocs();
|
||||
}
|
||||
return TopDocs.merge(cappedNumHits, topDocs);
|
||||
}
|
||||
|
||||
return TopDocs.merge(numHits, topDocs);
|
||||
}
|
||||
};
|
||||
|
||||
return search(query, manager);
|
||||
}
|
||||
|
||||
|
||||
/** Finds the top <code>n</code>
|
||||
* hits for <code>query</code>.
|
||||
*
|
||||
|
@ -324,7 +333,7 @@ public class IndexSearcher {
|
|||
}
|
||||
|
||||
/** Finds the top <code>n</code>
|
||||
* hits for <code>query</code> where all results are after a previous
|
||||
* hits for <code>query</code> where all results are after a previous
|
||||
* result (<code>after</code>).
|
||||
* <p>
|
||||
* By passing the bottom result from a previous page as <code>after</code>,
|
||||
|
@ -339,7 +348,7 @@ public class IndexSearcher {
|
|||
}
|
||||
|
||||
/** Finds the top <code>n</code>
|
||||
* hits for <code>query</code> where all results are after a previous
|
||||
* hits for <code>query</code> where all results are after a previous
|
||||
* result (<code>after</code>), allowing control over
|
||||
* whether hit scores and max score should be computed.
|
||||
* <p>
|
||||
|
@ -371,39 +380,72 @@ public class IndexSearcher {
|
|||
throw new IllegalArgumentException("after.doc exceeds the number of documents in the reader: after.doc="
|
||||
+ after.doc + " limit=" + limit);
|
||||
}
|
||||
numHits = Math.min(numHits, limit);
|
||||
final int cappedNumHits = Math.min(numHits, limit);
|
||||
|
||||
final boolean fillFields = true;
|
||||
final CollectorManager<TopFieldCollector, TopFieldDocs> manager = new CollectorManager<TopFieldCollector, TopFieldDocs>() {
|
||||
|
||||
@Override
|
||||
public TopFieldCollector newCollector() throws IOException {
|
||||
final boolean fillFields = true;
|
||||
return TopFieldCollector.create(sort, cappedNumHits, after, fillFields, doDocScores, doMaxScore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopFieldDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
|
||||
final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
|
||||
int i = 0;
|
||||
for (TopFieldCollector collector : collectors) {
|
||||
topDocs[i++] = collector.topDocs();
|
||||
}
|
||||
return TopDocs.merge(sort, cappedNumHits, topDocs);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
return search(query, manager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lower-level search API.
|
||||
* Search all leaves using the given {@link CollectorManager}. In contrast
|
||||
* to {@link #search(Query, Collector)}, this method will use the searcher's
|
||||
* {@link ExecutorService} in order to parallelize execution of the collection
|
||||
* on the configured {@link #leafSlices}.
|
||||
* @see CollectorManager
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public <C extends Collector, T> T search(Query query, CollectorManager<C, T> collectorManager) throws IOException {
|
||||
if (executor == null) {
|
||||
final TopFieldCollector collector = TopFieldCollector.create(sort, numHits, after, fillFields, doDocScores, doMaxScore);
|
||||
final C collector = collectorManager.newCollector();
|
||||
search(query, collector);
|
||||
return collector.topDocs();
|
||||
return collectorManager.reduce(Collections.singletonList(collector));
|
||||
} else {
|
||||
final TopFieldCollector[] collectors = new TopFieldCollector[leafSlices.length];
|
||||
final List<C> collectors = new ArrayList<>(leafSlices.length);
|
||||
boolean needsScores = false;
|
||||
for (int i = 0; i < leafSlices.length; ++i) {
|
||||
collectors[i] = TopFieldCollector.create(sort, numHits, after, fillFields, doDocScores, doMaxScore);
|
||||
needsScores |= collectors[i].needsScores();
|
||||
final C collector = collectorManager.newCollector();
|
||||
collectors.add(collector);
|
||||
needsScores |= collector.needsScores();
|
||||
}
|
||||
|
||||
final Weight weight = createNormalizedWeight(query, needsScores);
|
||||
final List<Future<TopFieldDocs>> topDocsFutures = new ArrayList<>(leafSlices.length);
|
||||
final List<Future<C>> topDocsFutures = new ArrayList<>(leafSlices.length);
|
||||
for (int i = 0; i < leafSlices.length; ++i) {
|
||||
final LeafReaderContext[] leaves = leafSlices[i].leaves;
|
||||
final TopFieldCollector collector = collectors[i];
|
||||
topDocsFutures.add(executor.submit(new Callable<TopFieldDocs>() {
|
||||
final C collector = collectors.get(i);
|
||||
topDocsFutures.add(executor.submit(new Callable<C>() {
|
||||
@Override
|
||||
public TopFieldDocs call() throws Exception {
|
||||
public C call() throws Exception {
|
||||
search(Arrays.asList(leaves), weight, collector);
|
||||
return collector.topDocs();
|
||||
return collector;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
final TopFieldDocs[] topDocs = new TopFieldDocs[leafSlices.length];
|
||||
for (int i = 0; i < topDocs.length; ++i) {
|
||||
final List<C> collectedCollectors = new ArrayList<>();
|
||||
for (Future<C> future : topDocsFutures) {
|
||||
try {
|
||||
topDocs[i] = topDocsFutures.get(i).get();
|
||||
collectedCollectors.add(future.get());
|
||||
} catch (InterruptedException e) {
|
||||
throw new ThreadInterruptedException(e);
|
||||
} catch (ExecutionException e) {
|
||||
|
@ -411,7 +453,7 @@ public class IndexSearcher {
|
|||
}
|
||||
}
|
||||
|
||||
return TopDocs.merge(sort, numHits, topDocs);
|
||||
return collectorManager.reduce(collectors);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ package org.apache.lucene.search;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -24,7 +25,9 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.SortedDocValuesField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
|
@ -131,5 +134,26 @@ public class TestIndexSearcher extends LuceneTestCase {
|
|||
IOUtils.close(r, dir);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testCount() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||
final int numDocs = atLeast(100);
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
Document doc = new Document();
|
||||
if (random().nextBoolean()) {
|
||||
doc.add(new StringField("foo", "bar", Store.NO));
|
||||
}
|
||||
w.addDocument(doc);
|
||||
}
|
||||
w.commit();
|
||||
final IndexReader reader = w.getReader();
|
||||
w.close();
|
||||
final IndexSearcher searcher = newSearcher(reader);
|
||||
final Query query = new TermQuery(new Term("foo", "bar"));
|
||||
assertEquals(searcher.count(query), searcher.search(query, 1).totalHits);
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue