mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 17:38:44 +00:00
Search option terminate_after does not handle post_filters and aggregations correctly (#28459)
* Search option terminate_after does not handle post_filters and aggregations correctly This change fixes the handling of the `terminate_after` option when post_filters (or min_score) are used. `post_filter` should be applied before `terminate_after` in order to terminate the query when enough document are accepted by the post_filters. This commit also changes the type of exception thrown by `terminate_after` in order to ensure that multi collectors (aggregations) do not try to continue the collection when enough documents have been collected. Closes #28411
This commit is contained in:
parent
55448b2630
commit
7dc00ef1f5
@ -120,6 +120,12 @@ all clients support GET with body, POST is allowed as well.
|
||||
[float]
|
||||
=== Fast check for any matching docs
|
||||
|
||||
NOTE: `terminate_after` is always applied **after** the `post_filter` and stops
|
||||
the query as well as the aggregation executions when enough hits have been
|
||||
collected on the shard. Though the doc count on aggregations may not reflect
|
||||
the `hits.total` in the response since aggregations are applied **before** the
|
||||
post filtering.
|
||||
|
||||
In case we only want to know if there are any documents matching a
|
||||
specific query, we can set the `size` to `0` to indicate that we are not
|
||||
interested in the search results. Also we can set `terminate_after` to `1`
|
||||
@ -128,7 +134,7 @@ matching document was found (per shard).
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
GET /_search?q=message:elasticsearch&size=0&terminate_after=1
|
||||
GET /_search?q=message:number&size=0&terminate_after=1
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
@ -27,39 +27,55 @@ import org.apache.lucene.search.FilterLeafCollector;
|
||||
import org.apache.lucene.search.LeafCollector;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
|
||||
*/
|
||||
public class EarlyTerminatingCollector extends FilterCollector {
|
||||
static final class EarlyTerminationException extends RuntimeException {
|
||||
EarlyTerminationException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private final int maxCountHits;
|
||||
private int numCollected;
|
||||
private boolean terminatedEarly = false;
|
||||
private boolean forceTermination;
|
||||
|
||||
EarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
|
||||
/**
|
||||
* Ctr
|
||||
* @param delegate The delegated collector.
|
||||
* @param maxCountHits The number of documents to collect before termination.
|
||||
* @param forceTermination Whether the collection should be terminated with an exception ({@link EarlyTerminationException})
|
||||
* that is not caught by other {@link Collector} or with a {@link CollectionTerminatedException} otherwise.
|
||||
*/
|
||||
EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination) {
|
||||
super(delegate);
|
||||
this.maxCountHits = maxCountHits;
|
||||
this.forceTermination = forceTermination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
|
||||
if (numCollected >= maxCountHits) {
|
||||
if (forceTermination) {
|
||||
throw new EarlyTerminationException("early termination [CountBased]");
|
||||
} else {
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
}
|
||||
return new FilterLeafCollector(super.getLeafCollector(context)) {
|
||||
@Override
|
||||
public void collect(int doc) throws IOException {
|
||||
super.collect(doc);
|
||||
if (++numCollected >= maxCountHits) {
|
||||
terminatedEarly = true;
|
||||
if (++numCollected > maxCountHits) {
|
||||
if (forceTermination) {
|
||||
throw new EarlyTerminationException("early termination [CountBased]");
|
||||
} else {
|
||||
throw new CollectionTerminatedException();
|
||||
}
|
||||
}
|
||||
super.collect(doc);
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
public boolean terminatedEarly() {
|
||||
return terminatedEarly;
|
||||
}
|
||||
}
|
||||
|
@ -171,16 +171,9 @@ abstract class QueryCollectorContext {
|
||||
@Override
|
||||
Collector create(Collector in) throws IOException {
|
||||
assert collector == null;
|
||||
this.collector = new EarlyTerminatingCollector(in, numHits);
|
||||
this.collector = new EarlyTerminatingCollector(in, numHits, true);
|
||||
return collector;
|
||||
}
|
||||
|
||||
@Override
|
||||
void postProcess(QuerySearchResult result) throws IOException {
|
||||
if (collector.terminatedEarly()) {
|
||||
result.terminatedEarly(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -177,6 +177,13 @@ public class QueryPhase implements SearchPhase {
|
||||
final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
|
||||
// whether the chain contains a collector that filters documents
|
||||
boolean hasFilterCollector = false;
|
||||
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
|
||||
// add terminate_after before the filter collectors
|
||||
// it will only be applied on documents accepted by these filter collectors
|
||||
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
|
||||
// this collector can filter documents during the collection
|
||||
hasFilterCollector = true;
|
||||
}
|
||||
if (searchContext.parsedPostFilter() != null) {
|
||||
// add post filters before aggregations
|
||||
// it will only be applied to top hits
|
||||
@ -194,12 +201,6 @@ public class QueryPhase implements SearchPhase {
|
||||
// this collector can filter documents during the collection
|
||||
hasFilterCollector = true;
|
||||
}
|
||||
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
|
||||
// apply terminate after after all filters collectors
|
||||
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
|
||||
// this collector can filter documents during the collection
|
||||
hasFilterCollector = true;
|
||||
}
|
||||
|
||||
boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
|
||||
searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;
|
||||
@ -263,6 +264,8 @@ public class QueryPhase implements SearchPhase {
|
||||
|
||||
try {
|
||||
searcher.search(query, queryCollector);
|
||||
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
|
||||
queryResult.terminatedEarly(true);
|
||||
} catch (TimeExceededException e) {
|
||||
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
|
||||
|
||||
|
@ -103,11 +103,11 @@ abstract class TopDocsCollectorContext extends QueryCollectorContext {
|
||||
this.collector = hitCountCollector;
|
||||
this.hitCountSupplier = hitCountCollector::getTotalHits;
|
||||
} else {
|
||||
this.collector = new EarlyTerminatingCollector(hitCountCollector, 0);
|
||||
this.collector = new EarlyTerminatingCollector(hitCountCollector, 0, false);
|
||||
this.hitCountSupplier = () -> hitCount;
|
||||
}
|
||||
} else {
|
||||
this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0);
|
||||
this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0, false);
|
||||
// for bwc hit count is set to 0, it will be converted to -1 by the coordinating node
|
||||
this.hitCountSupplier = () -> 0;
|
||||
}
|
||||
|
@ -181,6 +181,37 @@ public class QueryPhaseTests extends IndexShardTestCase {
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testTerminateAfterWithFilter() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
|
||||
IndexWriterConfig iwc = newIndexWriterConfig()
|
||||
.setIndexSort(sort);
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
|
||||
Document doc = new Document();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
doc.add(new StringField("foo", Integer.toString(i), Store.NO));
|
||||
}
|
||||
w.addDocument(doc);
|
||||
w.close();
|
||||
|
||||
IndexReader reader = DirectoryReader.open(dir);
|
||||
IndexSearcher contextSearcher = new IndexSearcher(reader);
|
||||
TestSearchContext context = new TestSearchContext(null, indexShard);
|
||||
context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
|
||||
context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery()));
|
||||
context.terminateAfter(1);
|
||||
context.setSize(10);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
context.parsedPostFilter(new ParsedQuery(new TermQuery(new Term("foo", Integer.toString(i)))));
|
||||
QueryPhase.execute(context, contextSearcher, checkCancelled -> {});
|
||||
assertEquals(1, context.queryResult().topDocs().totalHits);
|
||||
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
|
||||
}
|
||||
reader.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
|
||||
public void testMinScoreDisablesCountOptimization() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
|
||||
@ -346,6 +377,8 @@ public class QueryPhaseTests extends IndexShardTestCase {
|
||||
assertTrue(context.queryResult().terminatedEarly());
|
||||
assertThat(context.queryResult().topDocs().totalHits, equalTo(1L));
|
||||
assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1));
|
||||
assertThat(collector.getTotalHits(), equalTo(1));
|
||||
context.queryCollectors().clear();
|
||||
}
|
||||
{
|
||||
context.setSize(0);
|
||||
|
@ -236,7 +236,7 @@ public class SimpleSearchIT extends ESIntegTestCase {
|
||||
refresh();
|
||||
|
||||
SearchResponse searchResponse;
|
||||
for (int i = 1; i <= max; i++) {
|
||||
for (int i = 1; i < max; i++) {
|
||||
searchResponse = client().prepareSearch("test")
|
||||
.setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max))
|
||||
.setTerminateAfter(i).execute().actionGet();
|
||||
|
Loading…
x
Reference in New Issue
Block a user