Only execute one final reduction in InternalAutoDateHistogram (#45359)
Because auto-date-histo can perform multiple reductions while merging buckets, we need to ensure that the intermediate reductions are done with a `finalReduce` set to false to prevent Pipeline aggs from generating their output. Once all the buckets have been merged and the output is stable, a mostly-noop reduction can be performed which will allow pipelines to generate their output.
This commit is contained in:
parent
620cd4cb20
commit
c0ea8a867e
|
@ -500,15 +500,24 @@ public final class InternalAutoDateHistogram extends
|
||||||
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
|
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
|
||||||
|
|
||||||
if (reduceContext.isFinalReduce()) {
|
if (reduceContext.isFinalReduce()) {
|
||||||
|
// Because auto-date-histo can perform multiple reductions while merging buckets, we need to pretend this is
|
||||||
|
// not the final reduction to prevent pipeline aggs from creating their result early. However we want
|
||||||
|
// to reuse the multiBucketConsumer so that max_buckets breaker is correctly accounted for
|
||||||
|
ReduceContext penultimateReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(),
|
||||||
|
reduceContext::consumeBucketsAndMaybeBreak, false);
|
||||||
|
|
||||||
// adding empty buckets if needed
|
// adding empty buckets if needed
|
||||||
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
|
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, penultimateReduceContext);
|
||||||
|
|
||||||
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
|
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
|
||||||
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
|
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
|
||||||
reducedBucketsResult.roundingInfo, reduceContext);
|
reducedBucketsResult.roundingInfo, penultimateReduceContext);
|
||||||
|
|
||||||
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
|
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
|
||||||
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
|
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, penultimateReduceContext);
|
||||||
|
|
||||||
|
// Perform the final reduction which will mostly be a no-op, except for pipeline aggs
|
||||||
|
reducedBucketsResult = performFinalReduce(reducedBucketsResult, penultimateReduceContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
|
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
|
||||||
|
@ -561,6 +570,28 @@ public final class InternalAutoDateHistogram extends
|
||||||
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
|
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a final reduction on `reducedBuckets`. This should be called after all the buckets have been
|
||||||
|
* merged into the appropriate roundings. After the buckets are stable, this method will perform one last
|
||||||
|
* reduction with finalReduce: true so that Pipeline aggs can generate their output.
|
||||||
|
*/
|
||||||
|
private BucketReduceResult performFinalReduce(BucketReduceResult reducedBuckets, ReduceContext reduceContext) {
|
||||||
|
// We need to create another reduce context, this time setting finalReduce: true. Unlike the prior
|
||||||
|
// reduce context, we _do not_ want to reuse the multiBucketConsumer from the reduce context.
|
||||||
|
// We've already generated (and accounted for) all the buckets we will return, this method just triggers
|
||||||
|
// a final reduction on un-reduced items like pipelines. If we re-use the multiBucketConsumer we would
|
||||||
|
// over-count the buckets
|
||||||
|
ReduceContext finalReduceContext = new ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true);
|
||||||
|
|
||||||
|
List<Bucket> finalBuckets = new ArrayList<>();
|
||||||
|
for (int i = 0; i < reducedBuckets.buckets.size(); i++) {
|
||||||
|
finalBuckets.add(reducedBuckets.buckets.get(i).reduce(Collections.singletonList(reducedBuckets.buckets.get(i)),
|
||||||
|
reducedBuckets.roundingInfo.rounding, finalReduceContext));
|
||||||
|
}
|
||||||
|
assert reducedBuckets.buckets.size() == finalBuckets.size();
|
||||||
|
return new BucketReduceResult(finalBuckets, reducedBuckets.roundingInfo, reducedBuckets.roundingIdx, reducedBuckets.innerInterval);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
builder.startArray(CommonFields.BUCKETS.getPreferredName());
|
||||||
|
|
|
@ -36,10 +36,15 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.time.DateFormatter;
|
import org.elasticsearch.common.time.DateFormatter;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||||
|
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||||
|
import org.elasticsearch.index.mapper.NumberFieldMapper;
|
||||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||||
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
||||||
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.InternalMax;
|
||||||
import org.elasticsearch.search.aggregations.metrics.InternalStats;
|
import org.elasticsearch.search.aggregations.metrics.InternalStats;
|
||||||
|
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
|
||||||
|
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
|
||||||
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -58,9 +63,12 @@ import java.util.Map;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||||
private static final String DATE_FIELD = "date";
|
private static final String DATE_FIELD = "date";
|
||||||
private static final String INSTANT_FIELD = "instant";
|
private static final String INSTANT_FIELD = "instant";
|
||||||
|
private static final String NUMERIC_FIELD = "numeric";
|
||||||
|
|
||||||
private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
|
private static final List<ZonedDateTime> DATES_WITH_TIME = Arrays.asList(
|
||||||
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
|
ZonedDateTime.of(2010, 3, 12, 1, 7, 45, 0, ZoneOffset.UTC),
|
||||||
|
@ -718,6 +726,35 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testWithPipelineReductions() throws IOException {
|
||||||
|
testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME,
|
||||||
|
aggregation -> aggregation.setNumBuckets(1).field(DATE_FIELD)
|
||||||
|
.subAggregation(AggregationBuilders.histogram("histo").field(NUMERIC_FIELD).interval(1)
|
||||||
|
.subAggregation(AggregationBuilders.max("max").field(NUMERIC_FIELD))
|
||||||
|
.subAggregation(new DerivativePipelineAggregationBuilder("deriv", "max"))),
|
||||||
|
histogram -> {
|
||||||
|
assertTrue(AggregationInspectionHelper.hasValue(histogram));
|
||||||
|
final List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||||
|
assertEquals(1, buckets.size());
|
||||||
|
|
||||||
|
Histogram.Bucket bucket = buckets.get(0);
|
||||||
|
assertEquals("2010-01-01T00:00:00.000Z", bucket.getKeyAsString());
|
||||||
|
assertEquals(10, bucket.getDocCount());
|
||||||
|
assertThat(bucket.getAggregations().asList().size(), equalTo(1));
|
||||||
|
InternalHistogram histo = (InternalHistogram) bucket.getAggregations().asList().get(0);
|
||||||
|
assertThat(histo.getBuckets().size(), equalTo(10));
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(histo.getBuckets().get(i).key, equalTo((double)i));
|
||||||
|
assertThat(((InternalMax)histo.getBuckets().get(i).aggregations.get("max")).getValue(), equalTo((double)i));
|
||||||
|
if (i > 0) {
|
||||||
|
assertThat(((InternalSimpleValue)histo.getBuckets().get(i).aggregations.get("deriv")).getValue(), equalTo(1.0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
|
private void testSearchCase(final Query query, final List<ZonedDateTime> dataset,
|
||||||
final Consumer<AutoDateHistogramAggregationBuilder> configure,
|
final Consumer<AutoDateHistogramAggregationBuilder> configure,
|
||||||
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
|
final Consumer<InternalAutoDateHistogram> verify) throws IOException {
|
||||||
|
@ -757,6 +794,7 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||||
try (Directory directory = newDirectory()) {
|
try (Directory directory = newDirectory()) {
|
||||||
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
|
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
|
||||||
final Document document = new Document();
|
final Document document = new Document();
|
||||||
|
int i = 0;
|
||||||
for (final ZonedDateTime date : dataset) {
|
for (final ZonedDateTime date : dataset) {
|
||||||
if (frequently()) {
|
if (frequently()) {
|
||||||
indexWriter.commit();
|
indexWriter.commit();
|
||||||
|
@ -765,8 +803,10 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||||
final long instant = date.toInstant().toEpochMilli();
|
final long instant = date.toInstant().toEpochMilli();
|
||||||
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
|
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
|
||||||
document.add(new LongPoint(INSTANT_FIELD, instant));
|
document.add(new LongPoint(INSTANT_FIELD, instant));
|
||||||
|
document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i));
|
||||||
indexWriter.addDocument(document);
|
indexWriter.addDocument(document);
|
||||||
document.clear();
|
document.clear();
|
||||||
|
i += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -783,11 +823,19 @@ public class AutoDateHistogramAggregatorTests extends AggregatorTestCase {
|
||||||
fieldType.setHasDocValues(true);
|
fieldType.setHasDocValues(true);
|
||||||
fieldType.setName(aggregationBuilder.field());
|
fieldType.setName(aggregationBuilder.field());
|
||||||
|
|
||||||
|
MappedFieldType instantFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||||
|
instantFieldType.setName(INSTANT_FIELD);
|
||||||
|
instantFieldType.setHasDocValues(true);
|
||||||
|
|
||||||
|
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||||
|
numericFieldType.setName(NUMERIC_FIELD);
|
||||||
|
numericFieldType.setHasDocValues(true);
|
||||||
|
|
||||||
final InternalAutoDateHistogram histogram;
|
final InternalAutoDateHistogram histogram;
|
||||||
if (reduced) {
|
if (reduced) {
|
||||||
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType);
|
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType, numericFieldType);
|
||||||
} else {
|
} else {
|
||||||
histogram = search(indexSearcher, query, aggregationBuilder, fieldType);
|
histogram = search(indexSearcher, query, aggregationBuilder, fieldType, instantFieldType);
|
||||||
}
|
}
|
||||||
verify.accept(histogram);
|
verify.accept(histogram);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,11 +50,15 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
|
||||||
|
|
||||||
private DocValueFormat format;
|
private DocValueFormat format;
|
||||||
private RoundingInfo[] roundingInfos;
|
private RoundingInfo[] roundingInfos;
|
||||||
|
private int nbBuckets;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
// these need to be the same for each new instance created so that {@link #testReduceRandom()}
|
||||||
|
// has mergeable instances to work with
|
||||||
format = randomNumericDocValueFormat();
|
format = randomNumericDocValueFormat();
|
||||||
|
nbBuckets = randomNumberOfBuckets();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,7 +68,7 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
|
||||||
InternalAggregations aggregations) {
|
InternalAggregations aggregations) {
|
||||||
|
|
||||||
roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null);
|
roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null, null);
|
||||||
int nbBuckets = randomNumberOfBuckets();
|
|
||||||
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
|
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
|
||||||
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
|
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue