From 7dc00ef1f5e5710e61b66c46b1349dc95e21acf5 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Mon, 12 Feb 2018 13:36:33 +0100 Subject: [PATCH] Search option terminate_after does not handle post_filters and aggregations correctly (#28459) * Search option terminate_after does not handle post_filters and aggregations correctly This change fixes the handling of the `terminate_after` option when post_filters (or min_score) are used. `post_filter` should be applied before `terminate_after` in order to terminate the query when enough document are accepted by the post_filters. This commit also changes the type of exception thrown by `terminate_after` in order to ensure that multi collectors (aggregations) do not try to continue the collection when enough documents have been collected. Closes #28411 --- docs/reference/search/request-body.asciidoc | 8 +++- .../query/EarlyTerminatingCollector.java | 40 +++++++++++++------ .../search/query/QueryCollectorContext.java | 9 +---- .../search/query/QueryPhase.java | 15 ++++--- .../search/query/TopDocsCollectorContext.java | 4 +- .../search/query/QueryPhaseTests.java | 33 +++++++++++++++ .../search/simple/SimpleSearchIT.java | 2 +- 7 files changed, 81 insertions(+), 30 deletions(-) diff --git a/docs/reference/search/request-body.asciidoc b/docs/reference/search/request-body.asciidoc index 6e562731fac..2a51d705d83 100644 --- a/docs/reference/search/request-body.asciidoc +++ b/docs/reference/search/request-body.asciidoc @@ -120,6 +120,12 @@ all clients support GET with body, POST is allowed as well. [float] === Fast check for any matching docs +NOTE: `terminate_after` is always applied **after** the `post_filter` and stops + the query as well as the aggregation executions when enough hits have been + collected on the shard. Though the doc count on aggregations may not reflect + the `hits.total` in the response since aggregations are applied **before** the + post filtering. + In case we only want to know if there are any documents matching a specific query, we can set the `size` to `0` to indicate that we are not interested in the search results. Also we can set `terminate_after` to `1` @@ -128,7 +134,7 @@ matching document was found (per shard). [source,js] -------------------------------------------------- -GET /_search?q=message:elasticsearch&size=0&terminate_after=1 +GET /_search?q=message:number&size=0&terminate_after=1 -------------------------------------------------- // CONSOLE // TEST[setup:twitter] diff --git a/server/src/main/java/org/elasticsearch/search/query/EarlyTerminatingCollector.java b/server/src/main/java/org/elasticsearch/search/query/EarlyTerminatingCollector.java index 2429c1c68e6..8b17437740c 100644 --- a/server/src/main/java/org/elasticsearch/search/query/EarlyTerminatingCollector.java +++ b/server/src/main/java/org/elasticsearch/search/query/EarlyTerminatingCollector.java @@ -27,39 +27,55 @@ import org.apache.lucene.search.FilterLeafCollector; import org.apache.lucene.search.LeafCollector; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link Collector} that early terminates collection after maxCountHits docs have been collected. */ public class EarlyTerminatingCollector extends FilterCollector { + static final class EarlyTerminationException extends RuntimeException { + EarlyTerminationException(String msg) { + super(msg); + } + } + private final int maxCountHits; private int numCollected; - private boolean terminatedEarly = false; + private boolean forceTermination; - EarlyTerminatingCollector(final Collector delegate, int maxCountHits) { + /** + * Ctr + * @param delegate The delegated collector. + * @param maxCountHits The number of documents to collect before termination. + * @param forceTermination Whether the collection should be terminated with an exception ({@link EarlyTerminationException}) + * that is not caught by other {@link Collector} or with a {@link CollectionTerminatedException} otherwise. + */ + EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination) { super(delegate); this.maxCountHits = maxCountHits; + this.forceTermination = forceTermination; } @Override public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { if (numCollected >= maxCountHits) { - throw new CollectionTerminatedException(); + if (forceTermination) { + throw new EarlyTerminationException("early termination [CountBased]"); + } else { + throw new CollectionTerminatedException(); + } } return new FilterLeafCollector(super.getLeafCollector(context)) { @Override public void collect(int doc) throws IOException { - super.collect(doc); - if (++numCollected >= maxCountHits) { - terminatedEarly = true; - throw new CollectionTerminatedException(); + if (++numCollected > maxCountHits) { + if (forceTermination) { + throw new EarlyTerminationException("early termination [CountBased]"); + } else { + throw new CollectionTerminatedException(); + } } + super.collect(doc); }; }; } - - public boolean terminatedEarly() { - return terminatedEarly; - } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java index 2ed806a32ae..ff80dda77fb 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java @@ -171,16 +171,9 @@ abstract class QueryCollectorContext { @Override Collector create(Collector in) throws IOException { assert collector == null; - this.collector = new EarlyTerminatingCollector(in, numHits); + this.collector = new EarlyTerminatingCollector(in, numHits, true); return collector; } - - @Override - void postProcess(QuerySearchResult result) throws IOException { - if (collector.terminatedEarly()) { - result.terminatedEarly(true); - } - } }; } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 6d8e2d60687..ca06005448c 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -177,6 +177,13 @@ public class QueryPhase implements SearchPhase { final LinkedList collectors = new LinkedList<>(); // whether the chain contains a collector that filters documents boolean hasFilterCollector = false; + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { + // add terminate_after before the filter collectors + // it will only be applied on documents accepted by these filter collectors + collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter())); + // this collector can filter documents during the collection + hasFilterCollector = true; + } if (searchContext.parsedPostFilter() != null) { // add post filters before aggregations // it will only be applied to top hits @@ -194,12 +201,6 @@ public class QueryPhase implements SearchPhase { // this collector can filter documents during the collection hasFilterCollector = true; } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { - // apply terminate after after all filters collectors - collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter())); - // this collector can filter documents during the collection - hasFilterCollector = true; - } boolean timeoutSet = scrollContext == null && searchContext.timeout() != null && searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false; @@ -263,6 +264,8 @@ public class QueryPhase implements SearchPhase { try { searcher.search(query, queryCollector); + } catch (EarlyTerminatingCollector.EarlyTerminationException e) { + queryResult.terminatedEarly(true); } catch (TimeExceededException e) { assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set"; diff --git a/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java b/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java index 18e351e34a7..cf4ff6c77b8 100644 --- a/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java +++ b/server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorContext.java @@ -103,11 +103,11 @@ abstract class TopDocsCollectorContext extends QueryCollectorContext { this.collector = hitCountCollector; this.hitCountSupplier = hitCountCollector::getTotalHits; } else { - this.collector = new EarlyTerminatingCollector(hitCountCollector, 0); + this.collector = new EarlyTerminatingCollector(hitCountCollector, 0, false); this.hitCountSupplier = () -> hitCount; } } else { - this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0); + this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0, false); // for bwc hit count is set to 0, it will be converted to -1 by the coordinating node this.hitCountSupplier = () -> 0; } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 06d738cfb60..16365d829a8 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -181,6 +181,37 @@ public class QueryPhaseTests extends IndexShardTestCase { dir.close(); } + public void testTerminateAfterWithFilter() throws Exception { + Directory dir = newDirectory(); + final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); + IndexWriterConfig iwc = newIndexWriterConfig() + .setIndexSort(sort); + RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); + Document doc = new Document(); + for (int i = 0; i < 10; i++) { + doc.add(new StringField("foo", Integer.toString(i), Store.NO)); + } + w.addDocument(doc); + w.close(); + + IndexReader reader = DirectoryReader.open(dir); + IndexSearcher contextSearcher = new IndexSearcher(reader); + TestSearchContext context = new TestSearchContext(null, indexShard); + context.setTask(new SearchTask(123L, "", "", "", null, Collections.emptyMap())); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + context.terminateAfter(1); + context.setSize(10); + for (int i = 0; i < 10; i++) { + context.parsedPostFilter(new ParsedQuery(new TermQuery(new Term("foo", Integer.toString(i))))); + QueryPhase.execute(context, contextSearcher, checkCancelled -> {}); + assertEquals(1, context.queryResult().topDocs().totalHits); + assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1)); + } + reader.close(); + dir.close(); + } + + public void testMinScoreDisablesCountOptimization() throws Exception { Directory dir = newDirectory(); final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); @@ -346,6 +377,8 @@ public class QueryPhaseTests extends IndexShardTestCase { assertTrue(context.queryResult().terminatedEarly()); assertThat(context.queryResult().topDocs().totalHits, equalTo(1L)); assertThat(context.queryResult().topDocs().scoreDocs.length, equalTo(1)); + assertThat(collector.getTotalHits(), equalTo(1)); + context.queryCollectors().clear(); } { context.setSize(0); diff --git a/server/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java b/server/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java index 3de0d78be5f..6ba6eb5515b 100644 --- a/server/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java +++ b/server/src/test/java/org/elasticsearch/search/simple/SimpleSearchIT.java @@ -236,7 +236,7 @@ public class SimpleSearchIT extends ESIntegTestCase { refresh(); SearchResponse searchResponse; - for (int i = 1; i <= max; i++) { + for (int i = 1; i < max; i++) { searchResponse = client().prepareSearch("test") .setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max)) .setTerminateAfter(i).execute().actionGet();