mirror of https://github.com/apache/lucene.git
LUCENE-8939: Introduce Shared Count Early Termination In Parallel Search (#823)
This commit introduces a strategy to early terminate for sorted collections during parallel search when requested number of hits have been collected but the total hits threshold has not yet been reached.
This commit is contained in:
parent
ad6d73b481
commit
55d9290433
|
@ -94,6 +94,9 @@ the total hits is not requested.
|
|||
* LUCENE-8968: Improve performance of WITHIN and DISJOINT queries for Shape queries by
|
||||
doing just one pass whenever possible. (Ignacio Vera)
|
||||
|
||||
* LUCENE-8939: Introduce shared count based early termination across multiple slices
|
||||
(Atri Sharma)
|
||||
|
||||
Bug Fixes
|
||||
|
||||
* LUCENE-8755: spatial-extras quad and packed quad prefix trees could throw a
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.search;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Used for defining custom algorithms to allow searches to early terminate
|
||||
*/
|
||||
abstract class HitsThresholdChecker {
|
||||
/**
|
||||
* Implementation of HitsThresholdChecker which allows global hit counting
|
||||
*/
|
||||
private static class GlobalHitsThresholdChecker extends HitsThresholdChecker {
|
||||
private final int totalHitsThreshold;
|
||||
private final AtomicLong globalHitCount;
|
||||
|
||||
public GlobalHitsThresholdChecker(int totalHitsThreshold) {
|
||||
|
||||
if (totalHitsThreshold < 0) {
|
||||
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
|
||||
}
|
||||
|
||||
this.totalHitsThreshold = totalHitsThreshold;
|
||||
this.globalHitCount = new AtomicLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementHitCount() {
|
||||
globalHitCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isThresholdReached(){
|
||||
return globalHitCount.get() > totalHitsThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHitsThreshold() {
|
||||
return totalHitsThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation of HitsThresholdChecker to be used for single threaded execution
|
||||
*/
|
||||
private static class LocalHitsThresholdChecker extends HitsThresholdChecker {
|
||||
private final int totalHitsThreshold;
|
||||
private int hitCount;
|
||||
|
||||
public LocalHitsThresholdChecker(int totalHitsThreshold) {
|
||||
|
||||
if (totalHitsThreshold < 0) {
|
||||
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
|
||||
}
|
||||
|
||||
this.totalHitsThreshold = totalHitsThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementHitCount() {
|
||||
++hitCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isThresholdReached() {
|
||||
return hitCount > totalHitsThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHitsThreshold() {
|
||||
return totalHitsThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns a threshold checker that is useful for single threaded searches
|
||||
*/
|
||||
public static HitsThresholdChecker create(final int totalHitsThreshold) {
|
||||
return new LocalHitsThresholdChecker(totalHitsThreshold);
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns a threshold checker that is based on a shared counter
|
||||
*/
|
||||
public static HitsThresholdChecker createShared(final int totalHitsThreshold) {
|
||||
return new GlobalHitsThresholdChecker(totalHitsThreshold);
|
||||
}
|
||||
|
||||
public abstract void incrementHitCount();
|
||||
public abstract ScoreMode scoreMode();
|
||||
public abstract int getHitsThreshold();
|
||||
public abstract boolean isThresholdReached();
|
||||
}
|
|
@ -396,9 +396,11 @@ public class IndexSearcher {
|
|||
|
||||
final CollectorManager<TopScoreDocCollector, TopDocs> manager = new CollectorManager<TopScoreDocCollector, TopDocs>() {
|
||||
|
||||
private final HitsThresholdChecker hitsThresholdChecker = (executor == null || leafSlices.length <= 1) ? HitsThresholdChecker.create(TOTAL_HITS_THRESHOLD) :
|
||||
HitsThresholdChecker.createShared(TOTAL_HITS_THRESHOLD);
|
||||
@Override
|
||||
public TopScoreDocCollector newCollector() throws IOException {
|
||||
return TopScoreDocCollector.create(cappedNumHits, after, TOTAL_HITS_THRESHOLD);
|
||||
return TopScoreDocCollector.create(cappedNumHits, after, hitsThresholdChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -524,10 +526,13 @@ public class IndexSearcher {
|
|||
|
||||
final CollectorManager<TopFieldCollector, TopFieldDocs> manager = new CollectorManager<TopFieldCollector, TopFieldDocs>() {
|
||||
|
||||
private final HitsThresholdChecker hitsThresholdChecker = (executor == null || leafSlices.length <= 1) ? HitsThresholdChecker.create(TOTAL_HITS_THRESHOLD) :
|
||||
HitsThresholdChecker.createShared(TOTAL_HITS_THRESHOLD);
|
||||
|
||||
@Override
|
||||
public TopFieldCollector newCollector() throws IOException {
|
||||
// TODO: don't pay the price for accurate hit counts by default
|
||||
return TopFieldCollector.create(rewrittenSort, cappedNumHits, after, TOTAL_HITS_THRESHOLD);
|
||||
return TopFieldCollector.create(rewrittenSort, cappedNumHits, after, hitsThresholdChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.lucene.search;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -96,12 +97,12 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
* document scores and maxScore.
|
||||
*/
|
||||
private static class SimpleFieldCollector extends TopFieldCollector {
|
||||
|
||||
final Sort sort;
|
||||
final FieldValueHitQueue<Entry> queue;
|
||||
|
||||
public SimpleFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, int numHits, int totalHitsThreshold) {
|
||||
super(queue, numHits, totalHitsThreshold, sort.needsScores());
|
||||
public SimpleFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, int numHits,
|
||||
HitsThresholdChecker hitsThresholdChecker) {
|
||||
super(queue, numHits, hitsThresholdChecker, sort.needsScores());
|
||||
this.sort = sort;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
@ -128,13 +129,14 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
++totalHits;
|
||||
hitsThresholdChecker.incrementHitCount();
|
||||
if (queueFull) {
|
||||
if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) {
|
||||
// since docs are visited in doc Id order, if compare is 0, it means
|
||||
// this document is largest than anything else in the queue, and
|
||||
// therefore not competitive.
|
||||
if (canEarlyTerminate) {
|
||||
if (totalHits > totalHitsThreshold) {
|
||||
if (hitsThresholdChecker.isThresholdReached()) {
|
||||
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
|
||||
throw new CollectionTerminatedException();
|
||||
} else {
|
||||
|
@ -181,15 +183,13 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
int collectedHits;
|
||||
final FieldValueHitQueue<Entry> queue;
|
||||
final FieldDoc after;
|
||||
final int totalHitsThreshold;
|
||||
|
||||
public PagingFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, FieldDoc after, int numHits,
|
||||
int totalHitsThreshold) {
|
||||
super(queue, numHits, totalHitsThreshold, sort.needsScores());
|
||||
HitsThresholdChecker hitsThresholdChecker) {
|
||||
super(queue, numHits, hitsThresholdChecker, sort.needsScores());
|
||||
this.sort = sort;
|
||||
this.queue = queue;
|
||||
this.after = after;
|
||||
this.totalHitsThreshold = totalHitsThreshold;
|
||||
|
||||
FieldComparator<?>[] comparators = queue.comparators;
|
||||
// Tell all comparators their top value:
|
||||
|
@ -221,6 +221,7 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
//System.out.println(" collect doc=" + doc);
|
||||
|
||||
totalHits++;
|
||||
hitsThresholdChecker.incrementHitCount();
|
||||
|
||||
if (queueFull) {
|
||||
// Fastmatch: return if this hit is no better than
|
||||
|
@ -230,7 +231,7 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
// this document is largest than anything else in the queue, and
|
||||
// therefore not competitive.
|
||||
if (canEarlyTerminate) {
|
||||
if (totalHits > totalHitsThreshold) {
|
||||
if (hitsThresholdChecker.isThresholdReached()) {
|
||||
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
|
||||
throw new CollectionTerminatedException();
|
||||
} else {
|
||||
|
@ -282,7 +283,7 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];
|
||||
|
||||
final int numHits;
|
||||
final int totalHitsThreshold;
|
||||
final HitsThresholdChecker hitsThresholdChecker;
|
||||
final FieldComparator.RelevanceComparator firstComparator;
|
||||
final boolean canSetMinScore;
|
||||
final int numComparators;
|
||||
|
@ -297,17 +298,18 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
// internal versions. If someone will define a constructor with any other
|
||||
// visibility, then anyone will be able to extend the class, which is not what
|
||||
// we want.
|
||||
private TopFieldCollector(FieldValueHitQueue<Entry> pq, int numHits, int totalHitsThreshold, boolean needsScores) {
|
||||
private TopFieldCollector(FieldValueHitQueue<Entry> pq, int numHits,
|
||||
HitsThresholdChecker hitsThresholdChecker, boolean needsScores) {
|
||||
super(pq);
|
||||
this.needsScores = needsScores;
|
||||
this.numHits = numHits;
|
||||
this.totalHitsThreshold = totalHitsThreshold;
|
||||
this.hitsThresholdChecker = hitsThresholdChecker;
|
||||
this.numComparators = pq.getComparators().length;
|
||||
FieldComparator<?> fieldComparator = pq.getComparators()[0];
|
||||
int reverseMul = pq.reverseMul[0];
|
||||
if (fieldComparator.getClass().equals(FieldComparator.RelevanceComparator.class)
|
||||
&& reverseMul == 1 // if the natural sort is preserved (sort by descending relevance)
|
||||
&& totalHitsThreshold != Integer.MAX_VALUE) {
|
||||
&& hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) {
|
||||
firstComparator = (FieldComparator.RelevanceComparator) fieldComparator;
|
||||
scoreMode = ScoreMode.TOP_SCORES;
|
||||
canSetMinScore = true;
|
||||
|
@ -324,7 +326,7 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
}
|
||||
|
||||
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
|
||||
if (canSetMinScore && totalHits > totalHitsThreshold && queueFull) {
|
||||
if (canSetMinScore && hitsThresholdChecker.isThresholdReached() && queueFull) {
|
||||
assert bottom != null && firstComparator != null;
|
||||
float minScore = firstComparator.value(bottom.slot);
|
||||
scorer.setMinCompetitiveScore(minScore);
|
||||
|
@ -382,8 +384,19 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
* @return a {@link TopFieldCollector} instance which will sort the results by
|
||||
* the sort criteria.
|
||||
*/
|
||||
public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
|
||||
int totalHitsThreshold) {
|
||||
public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after, int totalHitsThreshold) {
|
||||
if (totalHitsThreshold < 0) {
|
||||
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
|
||||
}
|
||||
|
||||
return create(sort, numHits, after, HitsThresholdChecker.create(totalHitsThreshold));
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as above with an additional parameter to allow passing in the threshold checker
|
||||
*/
|
||||
static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
|
||||
HitsThresholdChecker hitsThresholdChecker) {
|
||||
|
||||
if (sort.fields.length == 0) {
|
||||
throw new IllegalArgumentException("Sort must contain at least one field");
|
||||
|
@ -393,14 +406,14 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
|
||||
}
|
||||
|
||||
if (totalHitsThreshold < 0) {
|
||||
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
|
||||
if (hitsThresholdChecker == null) {
|
||||
throw new IllegalArgumentException("hitsThresholdChecker should not be null");
|
||||
}
|
||||
|
||||
FieldValueHitQueue<Entry> queue = FieldValueHitQueue.create(sort.fields, numHits);
|
||||
|
||||
if (after == null) {
|
||||
return new SimpleFieldCollector(sort, queue, numHits, totalHitsThreshold);
|
||||
return new SimpleFieldCollector(sort, queue, numHits, hitsThresholdChecker);
|
||||
} else {
|
||||
if (after.fields == null) {
|
||||
throw new IllegalArgumentException("after.fields wasn't set; you must pass fillFields=true for the previous search");
|
||||
|
@ -410,10 +423,36 @@ public abstract class TopFieldCollector extends TopDocsCollector<Entry> {
|
|||
throw new IllegalArgumentException("after.fields has " + after.fields.length + " values but sort has " + sort.getSort().length);
|
||||
}
|
||||
|
||||
return new PagingFieldCollector(sort, queue, after, numHits, totalHitsThreshold);
|
||||
return new PagingFieldCollector(sort, queue, after, numHits, hitsThresholdChecker);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a CollectorManager which uses a shared hit counter to maintain number of hits
|
||||
*/
|
||||
public static CollectorManager<TopFieldCollector, TopFieldDocs> createSharedManager(Sort sort, int numHits, FieldDoc after,
|
||||
int totalHitsThreshold) {
|
||||
return new CollectorManager<TopFieldCollector, TopFieldDocs>() {
|
||||
|
||||
private final HitsThresholdChecker hitsThresholdChecker = HitsThresholdChecker.createShared(totalHitsThreshold);
|
||||
|
||||
@Override
|
||||
public TopFieldCollector newCollector() throws IOException {
|
||||
return create(sort, numHits, after, hitsThresholdChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopFieldDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
|
||||
final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
|
||||
int i = 0;
|
||||
for (TopFieldCollector collector : collectors) {
|
||||
topDocs[i++] = collector.topDocs();
|
||||
}
|
||||
return TopDocs.merge(sort, numHits, topDocs);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Populate {@link ScoreDoc#score scores} of the given {@code topDocs}.
|
||||
* @param topDocs the top docs to populate
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.lucene.search;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
|
||||
|
@ -48,8 +49,8 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
|
||||
private static class SimpleTopScoreDocCollector extends TopScoreDocCollector {
|
||||
|
||||
SimpleTopScoreDocCollector(int numHits, int totalHitsThreshold) {
|
||||
super(numHits, totalHitsThreshold);
|
||||
SimpleTopScoreDocCollector(int numHits, HitsThresholdChecker hitsThresholdChecker) {
|
||||
super(numHits, hitsThresholdChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,8 +72,10 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
assert score >= 0; // NOTE: false for NaN
|
||||
|
||||
totalHits++;
|
||||
hitsThresholdChecker.incrementHitCount();
|
||||
|
||||
if (score <= pqTop.score) {
|
||||
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO && totalHits > totalHitsThreshold) {
|
||||
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO && hitsThresholdChecker.isThresholdReached()) {
|
||||
// we just reached totalHitsThreshold, we can start setting the min
|
||||
// competitive score now
|
||||
updateMinCompetitiveScore(scorer);
|
||||
|
@ -97,8 +100,8 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
private final ScoreDoc after;
|
||||
private int collectedHits;
|
||||
|
||||
PagingTopScoreDocCollector(int numHits, ScoreDoc after, int totalHitsThreshold) {
|
||||
super(numHits, totalHitsThreshold);
|
||||
PagingTopScoreDocCollector(int numHits, ScoreDoc after, HitsThresholdChecker hitsThresholdChecker) {
|
||||
super(numHits, hitsThresholdChecker);
|
||||
this.after = after;
|
||||
this.collectedHits = 0;
|
||||
}
|
||||
|
@ -130,10 +133,11 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
assert score >= 0; // NOTE: false for NaN
|
||||
|
||||
totalHits++;
|
||||
hitsThresholdChecker.incrementHitCount();
|
||||
|
||||
if (score > after.score || (score == after.score && doc <= afterDoc)) {
|
||||
// hit was collected on a previous page
|
||||
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO && totalHits > totalHitsThreshold) {
|
||||
if (totalHitsRelation == TotalHits.Relation.EQUAL_TO && hitsThresholdChecker.isThresholdReached()) {
|
||||
// we just reached totalHitsThreshold, we can start setting the min
|
||||
// competitive score now
|
||||
updateMinCompetitiveScore(scorer);
|
||||
|
@ -191,32 +195,65 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
* objects.
|
||||
*/
|
||||
public static TopScoreDocCollector create(int numHits, ScoreDoc after, int totalHitsThreshold) {
|
||||
return create(numHits, after, HitsThresholdChecker.create(totalHitsThreshold));
|
||||
}
|
||||
|
||||
static TopScoreDocCollector create(int numHits, ScoreDoc after, HitsThresholdChecker hitsThresholdChecker) {
|
||||
|
||||
if (numHits <= 0) {
|
||||
throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
|
||||
}
|
||||
|
||||
if (totalHitsThreshold < 0) {
|
||||
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
|
||||
if (hitsThresholdChecker == null) {
|
||||
throw new IllegalArgumentException("hitsThresholdChecker must be non null");
|
||||
}
|
||||
|
||||
if (after == null) {
|
||||
return new SimpleTopScoreDocCollector(numHits, totalHitsThreshold);
|
||||
return new SimpleTopScoreDocCollector(numHits, hitsThresholdChecker);
|
||||
} else {
|
||||
return new PagingTopScoreDocCollector(numHits, after, totalHitsThreshold);
|
||||
return new PagingTopScoreDocCollector(numHits, after, hitsThresholdChecker);
|
||||
}
|
||||
}
|
||||
|
||||
final int totalHitsThreshold;
|
||||
/**
|
||||
* Create a CollectorManager which uses a shared hit counter to maintain number of hits
|
||||
*/
|
||||
public static CollectorManager<TopScoreDocCollector, TopDocs> createSharedManager(int numHits, FieldDoc after,
|
||||
int totalHitsThreshold) {
|
||||
return new CollectorManager<TopScoreDocCollector, TopDocs>() {
|
||||
|
||||
private final HitsThresholdChecker hitsThresholdChecker = HitsThresholdChecker.createShared(totalHitsThreshold);
|
||||
|
||||
@Override
|
||||
public TopScoreDocCollector newCollector() throws IOException {
|
||||
return TopScoreDocCollector.create(numHits, after, hitsThresholdChecker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopDocs reduce(Collection<TopScoreDocCollector> collectors) throws IOException {
|
||||
final TopDocs[] topDocs = new TopDocs[collectors.size()];
|
||||
int i = 0;
|
||||
for (TopScoreDocCollector collector : collectors) {
|
||||
topDocs[i++] = collector.topDocs();
|
||||
}
|
||||
return TopDocs.merge(numHits, topDocs);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
ScoreDoc pqTop;
|
||||
final HitsThresholdChecker hitsThresholdChecker;
|
||||
|
||||
// prevents instantiation
|
||||
TopScoreDocCollector(int numHits, int totalHitsThreshold) {
|
||||
TopScoreDocCollector(int numHits, HitsThresholdChecker hitsThresholdChecker) {
|
||||
super(new HitQueue(numHits, true));
|
||||
this.totalHitsThreshold = totalHitsThreshold;
|
||||
assert hitsThresholdChecker != null;
|
||||
|
||||
// HitQueue implements getSentinelObject to return a ScoreDoc, so we know
|
||||
// that at this point top() is already initialized.
|
||||
pqTop = pq.top();
|
||||
this.hitsThresholdChecker = hitsThresholdChecker;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -230,11 +267,11 @@ public abstract class TopScoreDocCollector extends TopDocsCollector<ScoreDoc> {
|
|||
|
||||
@Override
|
||||
public ScoreMode scoreMode() {
|
||||
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
|
||||
return hitsThresholdChecker.scoreMode();
|
||||
}
|
||||
|
||||
protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
|
||||
if (totalHits > totalHitsThreshold
|
||||
if (hitsThresholdChecker.isThresholdReached()
|
||||
&& pqTop != null
|
||||
&& pqTop.score != Float.NEGATIVE_INFINITY) { // -Infinity is the score of sentinels
|
||||
// since we tie-break on doc id and collect in doc id order, we can require
|
||||
|
|
|
@ -19,6 +19,10 @@ package org.apache.lucene.search;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
|
@ -29,6 +33,7 @@ import org.apache.lucene.index.NoMergePolicy;
|
|||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.NamedThreadFactory;
|
||||
|
||||
public class TestTopDocsCollector extends LuceneTestCase {
|
||||
|
||||
|
@ -96,6 +101,31 @@ public class TestTopDocsCollector extends LuceneTestCase {
|
|||
searcher.search(q, tdc);
|
||||
return tdc;
|
||||
}
|
||||
|
||||
private TopDocsCollector<ScoreDoc> doSearchWithThreshold(int numResults, int thresHold) throws IOException {
|
||||
Query q = new MatchAllDocsQuery();
|
||||
IndexSearcher searcher = newSearcher(reader);
|
||||
TopDocsCollector<ScoreDoc> tdc = TopScoreDocCollector.create(numResults, thresHold);
|
||||
searcher.search(q, tdc);
|
||||
return tdc;
|
||||
}
|
||||
|
||||
private TopDocs doConcurrentSearchWithThreshold(int numResults, int thresHold) throws IOException {
|
||||
Query q = new MatchAllDocsQuery();
|
||||
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(),
|
||||
new NamedThreadFactory("TestTopDocsCollector"));
|
||||
IndexSearcher searcher = new IndexSearcher(reader, service);
|
||||
|
||||
CollectorManager collectorManager = TopScoreDocCollector.createSharedManager(numResults,
|
||||
null, Integer.MAX_VALUE);
|
||||
|
||||
TopDocs tdc = (TopDocs) searcher.search(q, collectorManager);
|
||||
|
||||
service.shutdown();
|
||||
|
||||
return tdc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
|
@ -274,6 +304,29 @@ public class TestTopDocsCollector extends LuceneTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
public void testSharedCountCollectorManager() throws Exception {
|
||||
Query q = new MatchAllDocsQuery();
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE));
|
||||
Document doc = new Document();
|
||||
w.addDocuments(Arrays.asList(doc, doc, doc, doc));
|
||||
w.flush();
|
||||
w.addDocuments(Arrays.asList(doc, doc));
|
||||
w.flush();
|
||||
IndexReader reader = DirectoryReader.open(w);
|
||||
assertEquals(2, reader.leaves().size());
|
||||
w.close();
|
||||
|
||||
TopDocsCollector collector = doSearchWithThreshold(5, 10);
|
||||
TopDocs tdc = doConcurrentSearchWithThreshold(5, 10);
|
||||
TopDocs tdc2 = collector.topDocs();
|
||||
|
||||
CheckHits.checkEqual(q, tdc.scoreDocs, tdc2.scoreDocs);
|
||||
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testTotalHits() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE));
|
||||
|
|
|
@ -20,6 +20,10 @@ package org.apache.lucene.search;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
|
@ -37,6 +41,7 @@ import org.apache.lucene.search.BooleanClause.Occur;
|
|||
import org.apache.lucene.search.FieldValueHitQueue.Entry;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.NamedThreadFactory;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
import static org.apache.lucene.search.SortField.FIELD_SCORE;
|
||||
|
@ -108,6 +113,37 @@ public class TestTopFieldCollector extends LuceneTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSharedHitcountCollector() throws Exception {
|
||||
|
||||
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(),
|
||||
new NamedThreadFactory("TestTopFieldCollector"));
|
||||
|
||||
IndexSearcher concurrentSearcher = new IndexSearcher(ir, service);
|
||||
|
||||
// Two Sort criteria to instantiate the multi/single comparators.
|
||||
Sort[] sort = new Sort[] {new Sort(SortField.FIELD_DOC), new Sort() };
|
||||
for(int i = 0; i < sort.length; i++) {
|
||||
Query q = new MatchAllDocsQuery();
|
||||
TopDocsCollector<Entry> tdc = TopFieldCollector.create(sort[i], 10, Integer.MAX_VALUE);
|
||||
|
||||
is.search(q, tdc);
|
||||
|
||||
CollectorManager tsdc = TopFieldCollector.createSharedManager(sort[i], 10, null, Integer.MAX_VALUE);
|
||||
|
||||
TopDocs td = tdc.topDocs();
|
||||
TopDocs td2 = (TopDocs) concurrentSearcher.search(q, tsdc);
|
||||
ScoreDoc[] sd = td.scoreDocs;
|
||||
for(int j = 0; j < sd.length; j++) {
|
||||
assertTrue(Float.isNaN(sd[j].score));
|
||||
}
|
||||
|
||||
CheckHits.checkEqual(q, td.scoreDocs, td2.scoreDocs);
|
||||
}
|
||||
|
||||
service.shutdown();
|
||||
}
|
||||
|
||||
public void testSortWithoutTotalHitTracking() throws Exception {
|
||||
Sort sort = new Sort(SortField.FIELD_DOC);
|
||||
for(int i = 0; i < 2; i++) {
|
||||
|
|
Loading…
Reference in New Issue