Fix agg profiling when using breadth_first collect mode
Previous to this change the nesting of aggregation profiling results would be incorrect when the request contains a terms aggregation and the collect mode is (implicitly or explicitly) set to `breadth_first`. This was because the aggregation profiling has to make the assumption that the `preCollection()` method of children aggregations is always called in the `preCollection()` method of their parent aggregation. When the collect mode is `breadth_first` the `preCollection` of the children aggregations was delayed until the documents were replayed. This change moves the `preCollection()` of deferred aggregations to run during the `preCollection()` of the parent aggregation. This should have no adverse impact on the breadth_first mode as there is no allocation of memory in any of the aggregations. We also apply the same logic to the diversified sampler aggregation as we did to the terms aggregation to move the `preCollection()` of the child aggregations method to be called during the `preCollection()` of the parent aggregation. This commit also includes a fix so that the `ProfilingLeafBucketCollector` propagates the scorer to its delegate so the diversified sampler agg works when profiling is enabled.
This commit is contained in:
parent
b521638f52
commit
f5fbb3eb8b
|
@ -25,11 +25,9 @@ import org.apache.lucene.search.Scorer;
|
|||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
import org.apache.lucene.util.packed.PackedLongValues;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
||||
import org.elasticsearch.search.aggregations.BucketCollector;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
|
@ -119,6 +117,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
|
|||
|
||||
@Override
|
||||
public void preCollection() throws IOException {
|
||||
collector.preCollection();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,7 +144,6 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
|
|||
}
|
||||
this.selectedBuckets = hash;
|
||||
|
||||
collector.preCollection();
|
||||
boolean needsScores = collector.needsScores();
|
||||
Weight weight = null;
|
||||
if (needsScores) {
|
||||
|
|
|
@ -48,7 +48,7 @@ import java.util.List;
|
|||
* {@link BestDocsDeferringCollector#createTopDocsCollector(int)} is designed to
|
||||
* be overridden and allows subclasses to choose a custom collector
|
||||
* implementation for determining the top N matches.
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
public class BestDocsDeferringCollector extends DeferringBucketCollector implements Releasable {
|
||||
|
@ -61,7 +61,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
|
||||
/**
|
||||
* Sole constructor.
|
||||
*
|
||||
*
|
||||
* @param shardSize
|
||||
* The number of top-scoring docs to collect for each bucket
|
||||
*/
|
||||
|
@ -111,6 +111,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
|
||||
@Override
|
||||
public void preCollection() throws IOException {
|
||||
deferred.preCollection();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,7 +126,6 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
}
|
||||
|
||||
private void runDeferredAggs() throws IOException {
|
||||
deferred.preCollection();
|
||||
|
||||
List<ScoreDoc> allDocs = new ArrayList<>(shardSize);
|
||||
for (int i = 0; i < perBucketSamples.size(); i++) {
|
||||
|
@ -135,14 +135,14 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
}
|
||||
perBucketSample.getMatches(allDocs);
|
||||
}
|
||||
|
||||
|
||||
// Sort the top matches by docID for the benefit of deferred collector
|
||||
ScoreDoc[] docsArr = allDocs.toArray(new ScoreDoc[allDocs.size()]);
|
||||
Arrays.sort(docsArr, new Comparator<ScoreDoc>() {
|
||||
@Override
|
||||
public int compare(ScoreDoc o1, ScoreDoc o2) {
|
||||
if(o1.doc == o2.doc){
|
||||
return o1.shardIndex - o2.shardIndex;
|
||||
return o1.shardIndex - o2.shardIndex;
|
||||
}
|
||||
return o1.doc - o2.doc;
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
currentScore = scoreDoc.score;
|
||||
currentDocId = rebased;
|
||||
// We stored the bucket ID in Lucene's shardIndex property
|
||||
// for convenience.
|
||||
// for convenience.
|
||||
leafCollector.collect(rebased, scoreDoc.shardIndex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.search.profile.aggregation;
|
||||
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -40,4 +41,9 @@ public class ProfilingLeafBucketCollector extends LeafBucketCollector {
|
|||
profileBreakdown.stopAndRecordTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
delegate.setScorer(scorer);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.elasticsearch.search.profile.aggregation;
|
|||
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
|
||||
import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedOrdinalsSamplerAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator;
|
||||
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregator;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregator;
|
||||
|
@ -37,6 +39,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
|
||||
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
|
||||
|
@ -187,6 +190,129 @@ public class AggregationProfilerIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testMultiLevelProfileBreadthFirst() {
|
||||
SearchResponse response = client().prepareSearch("idx").setProfile(true)
|
||||
.addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L).subAggregation(terms("terms")
|
||||
.collectMode(SubAggCollectionMode.BREADTH_FIRST).field(TAG_FIELD).subAggregation(avg("avg").field(NUMBER_FIELD))))
|
||||
.get();
|
||||
assertSearchResponse(response);
|
||||
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
|
||||
assertThat(profileResults, notNullValue());
|
||||
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
|
||||
for (ProfileShardResult profileShardResult : profileResults.values()) {
|
||||
assertThat(profileShardResult, notNullValue());
|
||||
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
|
||||
assertThat(aggProfileResults, notNullValue());
|
||||
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
|
||||
assertThat(aggProfileResultsList, notNullValue());
|
||||
assertThat(aggProfileResultsList.size(), equalTo(1));
|
||||
ProfileResult histoAggResult = aggProfileResultsList.get(0);
|
||||
assertThat(histoAggResult, notNullValue());
|
||||
assertThat(histoAggResult.getQueryName(),
|
||||
equalTo("org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator"));
|
||||
assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
|
||||
assertThat(histoAggResult.getTime(), greaterThan(0L));
|
||||
Map<String, Long> histoBreakdown = histoAggResult.getTimeBreakdown();
|
||||
assertThat(histoBreakdown, notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
|
||||
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(1));
|
||||
|
||||
ProfileResult termsAggResult = histoAggResult.getProfiledChildren().get(0);
|
||||
assertThat(termsAggResult, notNullValue());
|
||||
assertThat(termsAggResult.getQueryName(), equalTo(GlobalOrdinalsStringTermsAggregator.WithHash.class.getName()));
|
||||
assertThat(termsAggResult.getLuceneDescription(), equalTo("terms"));
|
||||
assertThat(termsAggResult.getTime(), greaterThan(0L));
|
||||
Map<String, Long> termsBreakdown = termsAggResult.getTimeBreakdown();
|
||||
assertThat(termsBreakdown, notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
|
||||
assertThat(termsAggResult.getProfiledChildren().size(), equalTo(1));
|
||||
|
||||
ProfileResult avgAggResult = termsAggResult.getProfiledChildren().get(0);
|
||||
assertThat(avgAggResult, notNullValue());
|
||||
assertThat(avgAggResult.getQueryName(), equalTo(AvgAggregator.class.getName()));
|
||||
assertThat(avgAggResult.getLuceneDescription(), equalTo("avg"));
|
||||
assertThat(avgAggResult.getTime(), greaterThan(0L));
|
||||
Map<String, Long> avgBreakdown = termsAggResult.getTimeBreakdown();
|
||||
assertThat(avgBreakdown, notNullValue());
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
|
||||
assertThat(avgBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
|
||||
assertThat(avgAggResult.getProfiledChildren().size(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDiversifiedAggProfile() {
|
||||
SearchResponse response = client().prepareSearch("idx").setProfile(true)
|
||||
.addAggregation(diversifiedSampler("diversify").shardSize(10).field(STRING_FIELD).maxDocsPerValue(2)
|
||||
.subAggregation(max("max").field(NUMBER_FIELD)))
|
||||
.get();
|
||||
assertSearchResponse(response);
|
||||
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
|
||||
assertThat(profileResults, notNullValue());
|
||||
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
|
||||
for (ProfileShardResult profileShardResult : profileResults.values()) {
|
||||
assertThat(profileShardResult, notNullValue());
|
||||
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
|
||||
assertThat(aggProfileResults, notNullValue());
|
||||
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
|
||||
assertThat(aggProfileResultsList, notNullValue());
|
||||
assertThat(aggProfileResultsList.size(), equalTo(1));
|
||||
ProfileResult diversifyAggResult = aggProfileResultsList.get(0);
|
||||
assertThat(diversifyAggResult, notNullValue());
|
||||
assertThat(diversifyAggResult.getQueryName(),
|
||||
equalTo(DiversifiedOrdinalsSamplerAggregator.class.getName()));
|
||||
assertThat(diversifyAggResult.getLuceneDescription(), equalTo("diversify"));
|
||||
assertThat(diversifyAggResult.getTime(), greaterThan(0L));
|
||||
Map<String, Long> histoBreakdown = diversifyAggResult.getTimeBreakdown();
|
||||
assertThat(histoBreakdown, notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
|
||||
assertThat(histoBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
|
||||
assertThat(diversifyAggResult.getProfiledChildren().size(), equalTo(1));
|
||||
|
||||
ProfileResult maxAggResult = diversifyAggResult.getProfiledChildren().get(0);
|
||||
assertThat(maxAggResult, notNullValue());
|
||||
assertThat(maxAggResult.getQueryName(), equalTo(MaxAggregator.class.getName()));
|
||||
assertThat(maxAggResult.getLuceneDescription(), equalTo("max"));
|
||||
assertThat(maxAggResult.getTime(), greaterThan(0L));
|
||||
Map<String, Long> termsBreakdown = maxAggResult.getTimeBreakdown();
|
||||
assertThat(termsBreakdown, notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.INITIALIZE.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.COLLECT.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.BUILD_AGGREGATION.toString()), greaterThan(0L));
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), notNullValue());
|
||||
assertThat(termsBreakdown.get(AggregationTimingType.REDUCE.toString()), equalTo(0L));
|
||||
assertThat(maxAggResult.getProfiledChildren().size(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
public void testComplexProfile() {
|
||||
SearchResponse response = client().prepareSearch("idx").setProfile(true)
|
||||
.addAggregation(histogram("histo").field(NUMBER_FIELD).interval(1L)
|
||||
|
|
Loading…
Reference in New Issue