Prefer mapped aggs to lead reductions (#33528)

Previously, unmapped aggs try to delegate reduction to a sibling agg that is
mapped.  That delegated agg will run the reductions, and also
reduce any pipeline aggs.  But because delegation comes before running
pipelines, the unmapped agg _also_ tries to run pipeline aggs.

This causes the pipeline to run twice, and potentially double it's output
in buckets which can create invalid JSON (e.g. same key multiple times)
and break when converting to maps.

This fixes by sorting the list of aggregations ahead of time so that mapped
aggs appear first, meaning they preferentially lead the reduction.  If all aggs
are unmapped, the first unmapped agg simply creates a new unmapped object
and returns that for the reduction.

This means that unmapped aggs no longer defer and there is no chance for 
a secondary execution of pipelines (or other side effects caused by deferring
execution).

Closes #33514
This commit is contained in:
Zachary Tong 2018-09-26 10:09:31 -04:00 committed by GitHub
parent 1871e7f7e9
commit 25d74bd0cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 137 additions and 40 deletions

View File

@ -143,6 +143,14 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
public abstract InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
* false, it should return itself if asked to lead a reduction
*/
public boolean isMapped() {
return true;
}
/**
* Get the value of specified path in the aggregation.
*

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -37,6 +38,15 @@ import static java.util.Collections.emptyMap;
public final class InternalAggregations extends Aggregations implements Streamable {
public static final InternalAggregations EMPTY = new InternalAggregations();
private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
if (agg1.isMapped() == agg2.isMapped()) {
return 0;
} else if (agg1.isMapped() && agg2.isMapped() == false) {
return -1;
} else {
return 1;
}
};
private InternalAggregations() {
}
@ -73,6 +83,9 @@ public final class InternalAggregations extends Aggregations implements Streamab
List<InternalAggregation> reducedAggregations = new ArrayList<>();
for (Map.Entry<String, List<InternalAggregation>> entry : aggByName.entrySet()) {
List<InternalAggregation> aggregations = entry.getValue();
// Sort aggregations so that unmapped aggs come last in the list
// If all aggs are unmapped, the agg that leads the reduction will just return itself
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
}

View File

@ -50,12 +50,12 @@ public class UnmappedSampler extends InternalSampler {
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedSampler)) {
return agg.reduce(aggregations, reduceContext);
return new UnmappedSampler(name, pipelineAggregators(), metaData);
}
}
return this;
@Override
public boolean isMapped() {
return false;
}
@Override

View File

@ -100,12 +100,12 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation aggregation : aggregations) {
if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.reduce(aggregations, reduceContext);
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData);
}
}
return this;
@Override
public boolean isMapped() {
return false;
}
@Override

View File

@ -22,10 +22,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.BucketOrder;
import java.io.IOException;
import java.util.Collections;
@ -95,12 +95,12 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
for (InternalAggregation agg : aggregations) {
if (!(agg instanceof UnmappedTerms)) {
return agg.reduce(aggregations, reduceContext);
return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData);
}
}
return this;
@Override
public boolean isMapped() {
return false;
}
@Override

View File

@ -21,15 +21,26 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
@ -475,4 +486,56 @@ public class MaxBucketIT extends ESIntegTestCase {
assertThat(maxBucketValue.value(), equalTo(maxTermsValue));
assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()])));
}
/**
* https://github.com/elastic/elasticsearch/issues/33514
*
* This bug manifests as the max_bucket agg ("peak") being added to the response twice, because
* the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps.
* The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms
* delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then
* execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values.
*
* Applies to any pipeline agg, not just max.
*/
public void testFieldIsntWrittenOutTwice() throws Exception {
// you need to add an additional index with no fields in order to trigger this (or potentially a shard)
// so that there is an UnmappedTerms in the list to reduce.
createIndex("foo_1");
XContentBuilder builder = jsonBuilder().startObject().startObject("properties")
.startObject("@timestamp").field("type", "date").endObject()
.startObject("license").startObject("properties")
.startObject("count").field("type", "long").endObject()
.startObject("partnumber").field("type", "text").startObject("fields").startObject("keyword")
.field("type", "keyword").field("ignore_above", 256)
.endObject().endObject().endObject()
.endObject().endObject().endObject().endObject();
assertAcked(client().admin().indices().prepareCreate("foo_2")
.addMapping("doc", builder).get());
XContentBuilder docBuilder = jsonBuilder().startObject()
.startObject("license").field("partnumber", "foobar").field("count", 2).endObject()
.field("@timestamp", "2018-07-08T08:07:00.599Z")
.endObject();
client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
client().admin().indices().prepareRefresh();
TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber")
.field("license.partnumber.keyword");
MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder =
PipelineAggregatorBuilders.maxBucket("peak", "licenses_per_day>total_licenses");
SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count");
DateHistogramAggregationBuilder licensePerDayBuilder =
AggregationBuilders.dateHistogram("licenses_per_day").field("@timestamp").dateHistogramInterval(DateHistogramInterval.DAY);
licensePerDayBuilder.subAggregation(sumAggBuilder);
groupByLicenseAgg.subAggregation(licensePerDayBuilder);
groupByLicenseAgg.subAggregation(peakPipelineAggBuilder);
SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get();
BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false);
XContentHelper.convertToMap(bytes, false, XContentType.JSON);
}
}

View File

@ -86,36 +86,36 @@ import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedMin;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.aggregations.metrics.ParsedCardinality;
import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoBounds;
import org.elasticsearch.search.aggregations.metrics.ParsedGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import org.elasticsearch.search.aggregations.metrics.ParsedMin;
import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentileRanks;
import org.elasticsearch.search.aggregations.metrics.ParsedTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedValueCount;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
@ -134,6 +134,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.ParsedDerivativ
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -153,6 +154,16 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
public static final int DEFAULT_MAX_BUCKETS = 100000;
protected static final double TOLERANCE = 1e-10;
private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
if (agg1.isMapped() == agg2.isMapped()) {
return 0;
} else if (agg1.isMapped() && agg2.isMapped() == false) {
return -1;
} else {
return 1;
}
};
private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());
@ -239,6 +250,8 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
inputs.add(t);
toReduce.add(t);
}
// Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()
inputs.sort(INTERNAL_AGG_COMPARATOR);
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
if (randomBoolean() && toReduce.size() > 1) {