diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 5e5e60760d7..9a0b0a83d49 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; @@ -168,32 +169,37 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { for (Entry entry : entries) { assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0"; - final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); - DocIdSetIterator scoreIt = null; - if (needsScores) { - Scorer scorer = weight.scorer(entry.context); - // We don't need to check if the scorer is null - // since we are sure that there are documents to replay (entry.docDeltas it not empty). - scoreIt = scorer.iterator(); - leafCollector.setScorer(scorer); - } - final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); - final PackedLongValues.Iterator buckets = entry.buckets.iterator(); - int doc = 0; - for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { - doc += docDeltaIterator.next(); - final long bucket = buckets.next(); - final long rebasedBucket = hash.find(bucket); - if (rebasedBucket != -1) { - if (needsScores) { - if (scoreIt.docID() < doc) { - scoreIt.advance(doc); - } - // aggregations should only be replayed on matching documents - assert scoreIt.docID() == doc; - } - leafCollector.collect(doc, rebasedBucket); + try { + final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context); + DocIdSetIterator scoreIt = null; + if (needsScores) { + Scorer scorer = weight.scorer(entry.context); + // We don't need to check if the scorer is null + // since we are sure that there are documents to replay (entry.docDeltas it not empty). + scoreIt = scorer.iterator(); + leafCollector.setScorer(scorer); } + final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator(); + final PackedLongValues.Iterator buckets = entry.buckets.iterator(); + int doc = 0; + for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) { + doc += docDeltaIterator.next(); + final long bucket = buckets.next(); + final long rebasedBucket = hash.find(bucket); + if (rebasedBucket != -1) { + if (needsScores) { + if (scoreIt.docID() < doc) { + scoreIt.advance(doc); + } + // aggregations should only be replayed on matching documents + assert scoreIt.docID() == doc; + } + leafCollector.collect(doc, rebasedBucket); + } + } + } catch (CollectionTerminatedException e) { + // collection was terminated prematurely + // continue with the following leaf } } collector.postCollection(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java index 4fac9b5957d..47174089cd5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreDoc; @@ -254,25 +255,30 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme } public void replayRelatedMatches(List sd) throws IOException { - final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext); - leafCollector.setScorer(this); + try { + final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext); + leafCollector.setScorer(this); - currentScore = 0; - currentDocId = -1; - if (maxDocId < 0) { - return; - } - for (ScoreDoc scoreDoc : sd) { - // Doc ids from TopDocCollector are root-level Reader so - // need rebasing - int rebased = scoreDoc.doc - readerContext.docBase; - if ((rebased >= 0) && (rebased <= maxDocId)) { - currentScore = scoreDoc.score; - currentDocId = rebased; - // We stored the bucket ID in Lucene's shardIndex property - // for convenience. - leafCollector.collect(rebased, scoreDoc.shardIndex); + currentScore = 0; + currentDocId = -1; + if (maxDocId < 0) { + return; } + for (ScoreDoc scoreDoc : sd) { + // Doc ids from TopDocCollector are root-level Reader so + // need rebasing + int rebased = scoreDoc.doc - readerContext.docBase; + if ((rebased >= 0) && (rebased <= maxDocId)) { + currentScore = scoreDoc.score; + currentDocId = rebased; + // We stored the bucket ID in Lucene's shardIndex property + // for convenience. + leafCollector.collect(rebased, scoreDoc.shardIndex); + } + } + } catch (CollectionTerminatedException e) { + // collection was terminated prematurely + // continue with the following leaf } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxIT.java index cc2da2d386b..61786ab6dcd 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MaxIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.global.Global; @@ -411,4 +412,34 @@ public class MaxIT extends AbstractNumericTestCase { assertThat(count.getName(), equalTo("count")); assertThat(count.getValue(), equalTo(20L)); } + + public void testNestedEarlyTermination() throws Exception { + for (Aggregator.SubAggCollectionMode collectionMode : Aggregator.SubAggCollectionMode.values()) { + SearchResponse searchResponse = client().prepareSearch("idx") + .setTrackTotalHits(false) + .setQuery(matchAllQuery()) + .addAggregation(max("max").field("values")) + .addAggregation(count("count").field("values")) + .addAggregation(terms("terms").field("value") + .collectMode(collectionMode) + .subAggregation(max("sub_max").field("invalid"))) + .get(); + + Max max = searchResponse.getAggregations().get("max"); + assertThat(max, notNullValue()); + assertThat(max.getName(), equalTo("max")); + assertThat(max.getValue(), equalTo(12.0)); + + ValueCount count = searchResponse.getAggregations().get("count"); + assertThat(count.getName(), equalTo("count")); + assertThat(count.getValue(), equalTo(20L)); + + Terms terms = searchResponse.getAggregations().get("terms"); + assertThat(terms.getBuckets().size(), equalTo(10)); + for (Terms.Bucket b : terms.getBuckets()) { + InternalMax subMax = b.getAggregations().get("sub_max"); + assertThat(subMax.getValue(), equalTo(Double.NEGATIVE_INFINITY)); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinIT.java index 69468c86991..129afecd32f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/MinIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.global.Global; @@ -423,4 +424,32 @@ public class MinIT extends AbstractNumericTestCase { assertThat(count.getName(), equalTo("count")); assertThat(count.getValue(), equalTo(20L)); } + + public void testNestedEarlyTermination() throws Exception { + SearchResponse searchResponse = client().prepareSearch("idx") + .setTrackTotalHits(false) + .setQuery(matchAllQuery()) + .addAggregation(min("min").field("values")) + .addAggregation(count("count").field("values")) + .addAggregation(terms("terms").field("value") + .collectMode(Aggregator.SubAggCollectionMode.BREADTH_FIRST) + .subAggregation(min("sub_min").field("invalid"))) + .get(); + + Min min = searchResponse.getAggregations().get("min"); + assertThat(min, notNullValue()); + assertThat(min.getName(), equalTo("min")); + assertThat(min.getValue(), equalTo(2.0)); + + ValueCount count = searchResponse.getAggregations().get("count"); + assertThat(count.getName(), equalTo("count")); + assertThat(count.getValue(), equalTo(20L)); + + Terms terms = searchResponse.getAggregations().get("terms"); + assertThat(terms.getBuckets().size(), equalTo(10)); + for (Terms.Bucket b : terms.getBuckets()) { + InternalMin subMin = b.getAggregations().get("sub_min"); + assertThat(subMin.getValue(), equalTo(Double.POSITIVE_INFINITY)); + } + } }