diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index da328edd7aa..eafdbe10977 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -143,6 +143,14 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable public abstract InternalAggregation doReduce(List aggregations, ReduceContext reduceContext); + /** + * Return true if this aggregation is mapped, and can lead a reduction. If this agg returns + * false, it should return itself if asked to lead a reduction + */ + public boolean isMapped() { + return true; + } + /** * Get the value of specified path in the aggregation. * diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 57170e2f8ab..95140b50d2b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,6 +38,15 @@ import static java.util.Collections.emptyMap; public final class InternalAggregations extends Aggregations implements Streamable { public static final InternalAggregations EMPTY = new InternalAggregations(); + private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { + if (agg1.isMapped() == agg2.isMapped()) { + return 0; + } else if (agg1.isMapped() && agg2.isMapped() == false) { + return -1; + } else { + return 1; + } + }; private InternalAggregations() { } @@ -73,6 +83,9 @@ public final class InternalAggregations extends Aggregations implements Streamab List reducedAggregations = new ArrayList<>(); for (Map.Entry> entry : aggByName.entrySet()) { List aggregations = entry.getValue(); + // Sort aggregations so that unmapped aggs come last in the list + // If all aggs are unmapped, the agg that leads the reduction will just return itself + aggregations.sort(INTERNAL_AGG_COMPARATOR); InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand reducedAggregations.add(first.reduce(aggregations, context)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java index 3459e110d7e..5f5f557ffd5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java @@ -50,12 +50,12 @@ public class UnmappedSampler extends InternalSampler { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - for (InternalAggregation agg : aggregations) { - if (!(agg instanceof UnmappedSampler)) { - return agg.reduce(aggregations, reduceContext); - } - } - return this; + return new UnmappedSampler(name, pipelineAggregators(), metaData); + } + + @Override + public boolean isMapped() { + return false; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java index 66fc171bbe3..f2c9f8b29ad 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/UnmappedSignificantTerms.java @@ -51,13 +51,13 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms { private Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, - DocValueFormat format) { + DocValueFormat format) { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); } } public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List pipelineAggregators, - Map metaData) { + Map metaData) { super(name, requiredSize, minDocCount, pipelineAggregators, metaData); } @@ -100,12 +100,12 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms aggregations, ReduceContext reduceContext) { - for (InternalAggregation aggregation : aggregations) { - if (!(aggregation instanceof UnmappedSignificantTerms)) { - return aggregation.reduce(aggregations, reduceContext); - } - } - return this; + return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData); + } + + @Override + public boolean isMapped() { + return false; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 595991dac06..17a3e603b6f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -22,10 +22,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.BucketOrder; import java.io.IOException; import java.util.Collections; @@ -95,12 +95,12 @@ public class UnmappedTerms extends InternalTerms aggregations, ReduceContext reduceContext) { - for (InternalAggregation agg : aggregations) { - if (!(agg instanceof UnmappedTerms)) { - return agg.reduce(aggregations, reduceContext); - } - } - return this; + return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData); + } + + @Override + public boolean isMapped() { + return false; } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java index c3075da8271..4841c5e596a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java @@ -21,15 +21,26 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.filter.Filter; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.Sum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder; import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; @@ -475,4 +486,56 @@ public class MaxBucketIT extends ESIntegTestCase { assertThat(maxBucketValue.value(), equalTo(maxTermsValue)); assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()]))); } + + /** + * https://github.com/elastic/elasticsearch/issues/33514 + * + * This bug manifests as the max_bucket agg ("peak") being added to the response twice, because + * the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps. + * The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms + * delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then + * execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values. + * + * Applies to any pipeline agg, not just max. + */ + public void testFieldIsntWrittenOutTwice() throws Exception { + // you need to add an additional index with no fields in order to trigger this (or potentially a shard) + // so that there is an UnmappedTerms in the list to reduce. + createIndex("foo_1"); + + XContentBuilder builder = jsonBuilder().startObject().startObject("properties") + .startObject("@timestamp").field("type", "date").endObject() + .startObject("license").startObject("properties") + .startObject("count").field("type", "long").endObject() + .startObject("partnumber").field("type", "text").startObject("fields").startObject("keyword") + .field("type", "keyword").field("ignore_above", 256) + .endObject().endObject().endObject() + .endObject().endObject().endObject().endObject(); + assertAcked(client().admin().indices().prepareCreate("foo_2") + .addMapping("doc", builder).get()); + + XContentBuilder docBuilder = jsonBuilder().startObject() + .startObject("license").field("partnumber", "foobar").field("count", 2).endObject() + .field("@timestamp", "2018-07-08T08:07:00.599Z") + .endObject(); + + client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + client().admin().indices().prepareRefresh(); + + TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber") + .field("license.partnumber.keyword"); + MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder = + PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses"); + SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count"); + DateHistogramAggregationBuilder licensePerDayBuilder = + AggregationBuilders.dateHistogram("licenses_per_day").field("@timestamp").dateHistogramInterval(DateHistogramInterval.DAY); + licensePerDayBuilder.subAggregation(sumAggBuilder); + groupByLicenseAgg.subAggregation(licensePerDayBuilder); + groupByLicenseAgg.subAggregation(peakPipelineAggBuilder); + + SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get(); + BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false); + XContentHelper.convertToMap(bytes, false, XContentType.JSON); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 1149c7b0941..facbc6ec84b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -86,36 +86,36 @@ import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedAvg; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; +import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds; import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid; -import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedMax; -import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedMin; import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles; -import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; -import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ParsedAvg; +import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; +import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds; +import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid; +import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks; +import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles; +import org.elasticsearch.search.aggregations.metrics.ParsedMax; +import org.elasticsearch.search.aggregations.metrics.ParsedMin; +import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; +import org.elasticsearch.search.aggregations.metrics.ParsedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedSum; import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentileRanks; import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentiles; -import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; -import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedStats; -import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats; -import org.elasticsearch.search.aggregations.metrics.ParsedSum; -import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; -import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedValueCount; +import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; @@ -134,6 +134,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.ParsedDerivativ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -153,6 +154,16 @@ public abstract class InternalAggregationTestCase public static final int DEFAULT_MAX_BUCKETS = 100000; protected static final double TOLERANCE = 1e-10; + private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { + if (agg1.isMapped() == agg2.isMapped()) { + return 0; + } else if (agg1.isMapped() && agg2.isMapped() == false) { + return -1; + } else { + return 1; + } + }; + private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables()); @@ -239,6 +250,8 @@ public abstract class InternalAggregationTestCase inputs.add(t); toReduce.add(t); } + // Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce() + inputs.sort(INTERNAL_AGG_COMPARATOR); ScriptService mockScriptService = mockScriptService(); MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); if (randomBoolean() && toReduce.size() > 1) {