Histogram aggs: add empty buckets only in the final reduce step (#35921)

Empty buckets don't need to be added when performing an incremental reduction step, they can be added later in the final reduction step. This will allow us to later remove the max buckets limit when performing non final reduction.
This commit is contained in:
Luca Cavanna 2018-11-30 20:33:09 +01:00 committed by GitHub
parent d7652963b1
commit 0ebc17743a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 55 deletions

View File

@ -444,26 +444,22 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
@Override @Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext); List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
// adding empty buckets if needed if (minDocCount == 0) {
if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext);
addEmptyBuckets(reducedBuckets, reduceContext); }
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
} }
if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
format, keyed, pipelineAggregators(), getMetaData()); format, keyed, pipelineAggregators(), getMetaData());
} }

View File

@ -421,26 +421,22 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
@Override @Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext); List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
// adding empty buckets if needed if (minDocCount == 0) {
if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext);
addEmptyBuckets(reducedBuckets, reduceContext); }
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
} }
if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(), return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(),
getMetaData()); getMetaData());
} }

View File

@ -119,19 +119,19 @@ import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -151,6 +151,7 @@ import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregati
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields; import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> { public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
public static final int DEFAULT_MAX_BUCKETS = 100000; public static final int DEFAULT_MAX_BUCKETS = 100000;
@ -267,7 +268,14 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T reduced = (T) inputs.get(0).reduce(internalAggregations, context); T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
assertMultiBucketConsumer(reduced, bucketConsumer); int initialBucketCount = 0;
for (InternalAggregation internalAggregation : internalAggregations) {
initialBucketCount += countInnerBucket(internalAggregation);
}
int reducedBucketCount = countInnerBucket(reduced);
//check that non final reduction never adds buckets
assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount));
assertMultiBucketConsumer(reducedBucketCount, bucketConsumer);
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
toReduce.add(reduced); toReduce.add(reduced);
} }
@ -332,14 +340,14 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
public final void testFromXContent() throws IOException { public final void testFromXContent() throws IOException {
final T aggregation = createTestInstance(); final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false); final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); assertFromXContent(aggregation, parsedAggregation);
} }
public final void testFromXContentWithRandomFields() throws IOException { public final void testFromXContentWithRandomFields() throws IOException {
final T aggregation = createTestInstance(); final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true); final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); assertFromXContent(aggregation, parsedAggregation);
} }
protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;
@ -423,6 +431,10 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
} }
public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) { public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg))); assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer);
}
private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount));
} }
} }