Fix aggregators early termination with breadth-first mode (#44963)

This commit fixes a bug when a deferred aggregator tries to early terminate the collection. In such case the CollectionTerminatedException is not caught and
the search fails on the shard. This change makes sure that we catch the exception in order to continue the deferred collection on the next leaf.

Fixes #44909
This commit is contained in:
Jim Ferenczi 2019-07-30 10:28:29 +02:00 committed by jimczi
parent 5a0bd696fc
commit 43bd8f2ba0
4 changed files with 114 additions and 42 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
@ -168,32 +169,37 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert scoreIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
try {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert scoreIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
collector.postCollection();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.sampler;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
@ -254,25 +255,30 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
}
public void replayRelatedMatches(List<ScoreDoc> sd) throws IOException {
final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
leafCollector.setScorer(this);
try {
final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
leafCollector.setScorer(this);
currentScore = 0;
currentDocId = -1;
if (maxDocId < 0) {
return;
}
for (ScoreDoc scoreDoc : sd) {
// Doc ids from TopDocCollector are root-level Reader so
// need rebasing
int rebased = scoreDoc.doc - readerContext.docBase;
if ((rebased >= 0) && (rebased <= maxDocId)) {
currentScore = scoreDoc.score;
currentDocId = rebased;
// We stored the bucket ID in Lucene's shardIndex property
// for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
currentScore = 0;
currentDocId = -1;
if (maxDocId < 0) {
return;
}
for (ScoreDoc scoreDoc : sd) {
// Doc ids from TopDocCollector are root-level Reader so
// need rebasing
int rebased = scoreDoc.doc - readerContext.docBase;
if ((rebased >= 0) && (rebased <= maxDocId)) {
currentScore = scoreDoc.score;
currentDocId = rebased;
// We stored the bucket ID in Lucene's shardIndex property
// for convenience.
leafCollector.collect(rebased, scoreDoc.shardIndex);
}
}
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
@ -411,4 +412,34 @@ public class MaxIT extends AbstractNumericTestCase {
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}
public void testNestedEarlyTermination() throws Exception {
for (Aggregator.SubAggCollectionMode collectionMode : Aggregator.SubAggCollectionMode.values()) {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(max("max").field("values"))
.addAggregation(count("count").field("values"))
.addAggregation(terms("terms").field("value")
.collectMode(collectionMode)
.subAggregation(max("sub_max").field("invalid")))
.get();
Max max = searchResponse.getAggregations().get("max");
assertThat(max, notNullValue());
assertThat(max.getName(), equalTo("max"));
assertThat(max.getValue(), equalTo(12.0));
ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
Terms terms = searchResponse.getAggregations().get("terms");
assertThat(terms.getBuckets().size(), equalTo(10));
for (Terms.Bucket b : terms.getBuckets()) {
InternalMax subMax = b.getAggregations().get("sub_max");
assertThat(subMax.getValue(), equalTo(Double.NEGATIVE_INFINITY));
}
}
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.global.Global;
@ -423,4 +424,32 @@ public class MinIT extends AbstractNumericTestCase {
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}
public void testNestedEarlyTermination() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(min("min").field("values"))
.addAggregation(count("count").field("values"))
.addAggregation(terms("terms").field("value")
.collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST)
.subAggregation(min("sub_min").field("invalid")))
.get();
Min min = searchResponse.getAggregations().get("min");
assertThat(min, notNullValue());
assertThat(min.getName(), equalTo("min"));
assertThat(min.getValue(), equalTo(2.0));
ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
Terms terms = searchResponse.getAggregations().get("terms");
assertThat(terms.getBuckets().size(), equalTo(10));
for (Terms.Bucket b : terms.getBuckets()) {
InternalMin subMin = b.getAggregations().get("sub_min");
assertThat(subMin.getValue(), equalTo(Double.POSITIVE_INFINITY));
}
}
}