diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 3be59c0c1f7..3b0a48eb760 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -500,15 +500,24 @@ public final class InternalAutoDateHistogram extends BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext); if (reduceContext.isFinalReduce()) { + // Because auto-date-histo can perform multiple reductions while merging buckets, we need to pretend this is + // not the final reduction to prevent pipeline aggs from creating their result early. However we want + // to reuse the multiBucketConsumer so that max_buckets breaker is correctly accounted for + ReduceContext penultimateReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), + reduceContext::consumeBucketsAndMaybeBreak, false); + // adding empty buckets if needed - reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext); + reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, penultimateReduceContext); // Adding empty buckets may have tipped us over the target so merge the buckets again if needed reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx, - reducedBucketsResult.roundingInfo, reduceContext); + reducedBucketsResult.roundingInfo, penultimateReduceContext); // Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding - reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext); + reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, penultimateReduceContext); + + // Perform the final reduction which will mostly be a no-op, except for pipeline aggs + reducedBucketsResult = performFinalReduce(reducedBucketsResult, penultimateReduceContext); } BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx, @@ -561,6 +570,28 @@ public final class InternalAutoDateHistogram extends return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval); } + /** + * Execute a final reduction on `reducedBuckets`. This should be called after all the buckets have been + * merged into the appropriate roundings. After the buckets are stable, this method will perform one last + * reduction with finalReduce: true so that Pipeline aggs can generate their output. + */ + private BucketReduceResult performFinalReduce(BucketReduceResult reducedBuckets, ReduceContext reduceContext) { + // We need to create another reduce context, this time setting finalReduce: true. Unlike the prior + // reduce context, we _do not_ want to reuse the multiBucketConsumer from the reduce context. + // We've already generated (and accounted for) all the buckets we will return, this method just triggers + // a final reduction on un-reduced items like pipelines. If we re-use the multiBucketConsumer we would + // over-count the buckets + ReduceContext finalReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true); + + List finalBuckets = new ArrayList<>(); + for (int i = 0; i < reducedBuckets.buckets.size(); i++) { + finalBuckets.add(reducedBuckets.buckets.get(i).reduce(Collections.singletonList(reducedBuckets.buckets.get(i)), + reducedBuckets.roundingInfo.rounding, finalReduceContext)); + } + assert reducedBuckets.buckets.size() == finalBuckets.size(); + return new BucketReduceResult(finalBuckets, reducedBuckets.roundingInfo, reducedBuckets.roundingIdx, reducedBuckets.innerInterval); + } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.startArray(CommonFields.BUCKETS.getPreferredName()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 9293b33e22f..34ebb0d7ac4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -36,10 +36,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.hamcrest.Matchers; import org.junit.Assert; @@ -58,9 +63,12 @@ import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.equalTo; + public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { private static final String DATE_FIELD = "date"; private static final String INSTANT_FIELD = "instant"; + private static final String NUMERIC_FIELD = "numeric"; private static final List DATES_WITH_TIME = Arrays.asList( ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC), @@ -718,6 +726,35 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { ); } + public void testWithPipelineReductions() throws IOException { + testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME, + aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD) + .subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1) + .subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD)) + .subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + final List buckets = histogram.getBuckets(); + assertEquals(1, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(10, bucket.getDocCount()); + assertThat(bucket.getAggregations().asList().size(), equalTo(1)); + InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0); + assertThat(histo.getBuckets().size(), equalTo(10)); + for (int i = 0; i < 10; i++) { + assertThat(histo.getBuckets().get(i).key, equalTo((double)i)); + assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i)); + if (i > 0) { + assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0)); + } + } + + + }); + } + private void testSearchCase(final Query query, final List dataset, final Consumer configure, final Consumer verify) throws IOException { @@ -757,6 +794,7 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { final Document document = new Document(); + int i = 0; for (final ZonedDateTime date : dataset) { if (frequently()) { indexWriter.commit(); @@ -765,8 +803,10 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { final long instant = date.toInstant().toEpochMilli(); document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); + document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); indexWriter.addDocument(document); document.clear(); + i += 1; } } @@ -783,11 +823,19 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { fieldType.setHasDocValues(true); fieldType.setName(aggregationBuilder.field()); + MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + instantFieldType.setName(INSTANT_FIELD); + instantFieldType.setHasDocValues(true); + + MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + numericFieldType.setName(NUMERIC_FIELD); + numericFieldType.setHasDocValues(true); + final InternalAutoDateHistogram histogram; if (reduced) { - histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType); + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType); } else { - histogram = search(indexSearcher, query, aggregationBuilder, fieldType); + histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType); } verify.accept(histogram); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index 57020901aae..805242dbe32 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -50,11 +50,15 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati private DocValueFormat format; private RoundingInfo[] roundingInfos; + private int nbBuckets; @Override public void setUp() throws Exception { super.setUp(); + // these need to be the same for each new instance created so that {@link #testReduceRandom()} + // has mergeable instances to work with format = randomNumericDocValueFormat(); + nbBuckets = randomNumberOfBuckets(); } @Override @@ -64,7 +68,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati InternalAggregations aggregations) { roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null); - int nbBuckets = randomNumberOfBuckets(); + int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1); List buckets = new ArrayList<>(nbBuckets);