From 41abccf6c5716b08a42a8b40961f57b679694f1a Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 4 Jul 2017 16:47:48 +0100 Subject: [PATCH] Adds rewrite phase to aggregations (#25495) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adds rewrite phase to aggregations This change adds aggregations to the rewrite performed by the `SearchSourceBuilder`. This means that `AggregationBuilder`s are able to implement a `rewrite()` method where they can return a new `AggregationBuilder` which is functionally the same but in a more primitive form. This is exactly analogous to the rewrite done by the `QueryBuilder`s. The first aggregation to implement the rewrite are the filter and filters aggregations so they can rewrite the filters they contain. Closes #17676 * Removes rewrite from PipelineAggregationBuilder Rewrite is based on shard level information. Since pipeline aggregation are run in the reduce phase it doesn’t make sense to rewrite them on the shards. In fact eventually we shouldn’t be transporting them to the shards at all and should be retaining them on the coordinating node for execution in the reduce phase * Addresses review comments * addresses more review comments * Fixed imports --- .../AbstractAggregationBuilder.java | 2 +- .../aggregations/AggregationBuilder.java | 39 +++++++++++++++ .../aggregations/AggregatorFactories.java | 28 +++++++++++ .../filter/FilterAggregationBuilder.java | 15 ++++-- .../filters/FiltersAggregationBuilder.java | 27 +++++++--- .../search/builder/SearchSourceBuilder.java | 16 ++++-- .../indices/IndicesRequestCacheIT.java | 27 +++++----- .../AggregatorFactoriesTests.java | 49 +++++++++++++++++++ .../test/search.aggregation/50_filter.yml | 9 ++-- 9 files changed, 179 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AbstractAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/AbstractAggregationBuilder.java index bbd9e3a20f7..5dcfe35edae 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AbstractAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AbstractAggregationBuilder.java @@ -118,7 +118,7 @@ public abstract class AbstractAggregationBuilder getMetaData() { - return Collections.unmodifiableMap(metaData); + return metaData == null ? Collections.emptyMap() : Collections.unmodifiableMap(metaData); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index 97c1a0165a7..f0bbe13c05f 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -96,6 +97,44 @@ public abstract class AggregationBuilder @Override public abstract AggregationBuilder subAggregations(AggregatorFactories.Builder subFactories); + public final AggregationBuilder rewrite(QueryRewriteContext context) throws IOException { + AggregationBuilder rewritten = doRewrite(context); + if (rewritten == this) { + return rewritten; + } + if (getMetaData() != null && rewritten.getMetaData() == null) { + rewritten.setMetaData(getMetaData()); + } + AggregatorFactories.Builder rewrittenSubAggs = factoriesBuilder.rewrite(context); + rewritten.subAggregations(rewrittenSubAggs); + return rewritten; + } + + /** + * Rewrites this aggregation builder into its primitive form. By default + * this method return the builder itself. If the builder did not change the + * identity reference must be returned otherwise the builder will be + * rewritten infinitely. + */ + protected AggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException { + return this; + } + + /** + * Rewrites the given aggregation into its primitive form. Aggregations that for instance fetch resources from remote hosts or + * can simplify / optimize itself should do their heavy lifting during {@link #rewrite(QueryRewriteContext)}. This method + * rewrites the aggregation until it doesn't change anymore. + * @throws IOException if an {@link IOException} occurs + */ + static AggregationBuilder rewriteAggregation(AggregationBuilder original, QueryRewriteContext context) throws IOException { + AggregationBuilder builder = original; + for (AggregationBuilder rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder; + rewrittenBuilder = builder.rewrite(context)) { + builder = rewrittenBuilder; + } + return builder; + } + /** Common xcontent fields shared among aggregator builders */ public static final class CommonFields extends ParseField.CommonFields { public static final ParseField VALUE_TYPE = new ParseField("value_type"); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index fc62d981c81..eea346b0c3b 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; @@ -457,5 +458,32 @@ public class AggregatorFactories { return false; return true; } + + /** + * Rewrites the underlying aggregation builders into their primitive + * form. If the builder did not change the identity reference must be + * returned otherwise the builder will be rewritten infinitely. + */ + public Builder rewrite(QueryRewriteContext context) throws IOException { + boolean changed = false; + Builder newBuilder = new Builder(); + + for (AggregationBuilder builder : aggregationBuilders) { + AggregationBuilder result = AggregationBuilder.rewriteAggregation(builder, context); + if (result != builder) { + changed = true; + } + newBuilder.addAggregator(result); + } + + if (changed) { + for (PipelineAggregationBuilder builder : pipelineAggregatorBuilders) { + newBuilder.addPipelineAggregator(builder); + } + return newBuilder; + } else { + return this; + } + } } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregationBuilder.java index 2306f4c9f48..a26dc67ce9d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregationBuilder.java @@ -24,7 +24,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.internal.SearchContext; @@ -68,12 +70,19 @@ public class FilterAggregationBuilder extends AbstractAggregationBuilder doBuild(SearchContext context, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder) throws IOException { - // TODO this sucks we need a rewrite phase for aggregations too - final QueryBuilder rewrittenFilter = QueryBuilder.rewriteQuery(filter, context.getQueryShardContext()); - return new FilterAggregatorFactory(name, rewrittenFilter, context, parent, subFactoriesBuilder, metaData); + return new FilterAggregatorFactory(name, filter, context, parent, subFactoriesBuilder, metaData); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java index ad4428e11ad..a461a2b712d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filters/FiltersAggregationBuilder.java @@ -26,7 +26,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.bucket.filters.FiltersAggregator.KeyedFilter; @@ -167,15 +169,28 @@ public class FiltersAggregationBuilder extends AbstractAggregationBuilder rewrittenFilters = new ArrayList<>(filters.size()); + boolean changed = false; + for (KeyedFilter kf : filters) { + QueryBuilder result = QueryBuilder.rewriteQuery(kf.filter(), queryShardContext); + rewrittenFilters.add(new KeyedFilter(kf.key(), result)); + if (result != kf.filter()) { + changed = true; + } + } + if (changed) { + return new FiltersAggregationBuilder(getName(), rewrittenFilters); + } else { + return this; + } + } + @Override protected AggregatorFactory doBuild(SearchContext context, AggregatorFactory parent, Builder subFactoriesBuilder) throws IOException { - List rewrittenFilters = new ArrayList<>(filters.size()); - for(KeyedFilter kf : filters) { - rewrittenFilters.add(new KeyedFilter(kf.key(), QueryBuilder.rewriteQuery(kf.filter(), - context.getQueryShardContext()))); - } - return new FiltersAggregatorFactory(name, rewrittenFilters, keyed, otherBucket, otherBucketKey, context, parent, + return new FiltersAggregatorFactory(name, filters, keyed, otherBucket, otherBucketKey, context, parent, subFactoriesBuilder, metaData); } diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index 537635895ee..3f3be7b35d1 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -899,7 +899,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ * infinitely. */ public SearchSourceBuilder rewrite(QueryShardContext context) throws IOException { - assert (this.equals(shallowCopy(queryBuilder, postQueryBuilder, sliceBuilder))); + assert (this.equals(shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder))); QueryBuilder queryBuilder = null; if (this.queryBuilder != null) { queryBuilder = this.queryBuilder.rewrite(context); @@ -908,9 +908,14 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ if (this.postQueryBuilder != null) { postQueryBuilder = this.postQueryBuilder.rewrite(context); } - boolean rewritten = queryBuilder != this.queryBuilder || postQueryBuilder != this.postQueryBuilder; + AggregatorFactories.Builder aggregations = null; + if (this.aggregations != null) { + aggregations = this.aggregations.rewrite(context); + } + boolean rewritten = queryBuilder != this.queryBuilder || postQueryBuilder != this.postQueryBuilder + || aggregations != this.aggregations; if (rewritten) { - return shallowCopy(queryBuilder, postQueryBuilder, sliceBuilder); + return shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder); } return this; } @@ -919,14 +924,15 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ * Create a shallow copy of this builder with a new slice configuration. */ public SearchSourceBuilder copyWithNewSlice(SliceBuilder slice) { - return shallowCopy(queryBuilder, postQueryBuilder, slice); + return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice); } /** * Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@link #sliceBuilder}. Used by * {@link #rewrite(QueryShardContext)} and {@link #copyWithNewSlice(SliceBuilder)}. */ - private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder, SliceBuilder slice) { + private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder postQueryBuilder, + AggregatorFactories.Builder aggregations, SliceBuilder slice) { SearchSourceBuilder rewrittenBuilder = new SearchSourceBuilder(); rewrittenBuilder.aggregations = aggregations; rewrittenBuilder.explain = explain; diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java b/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java index 1e97d4dd57b..7f8fe5d40a9 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; @@ -37,8 +36,8 @@ import org.joda.time.format.DateTimeFormat; import java.util.List; import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange; +import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; @@ -411,18 +410,7 @@ public class IndicesRequestCacheIT extends ESIntegTestCase { assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), equalTo(0L)); - // If the request has an aggregation containing now we should not cache - final SearchResponse r4 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) - .addAggregation(filter("foo", QueryBuilders.rangeQuery("s").from("now-10y").to("now"))).get(); - assertSearchResponse(r4); - assertThat(r4.getHits().getTotalHits(), equalTo(7L)); - assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(), - equalTo(0L)); - assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), - equalTo(0L)); - - // If the request has an aggregation containng now we should not cache + // If the request has an non-filter aggregation containing now we should not cache final SearchResponse r5 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) .setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) .addAggregation(dateRange("foo").field("s").addRange("now-10y", "now")).get(); @@ -442,6 +430,17 @@ public class IndicesRequestCacheIT extends ESIntegTestCase { equalTo(0L)); assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), equalTo(2L)); + + // If the request has a filter aggregation containing now we should cache since it gets rewritten + final SearchResponse r4 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) + .setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .addAggregation(filter("foo", QueryBuilders.rangeQuery("s").from("now-10y").to("now"))).get(); + assertSearchResponse(r4); + assertThat(r4.getHits().getTotalHits(), equalTo(7L)); + assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(), + equalTo(0L)); + assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(), + equalTo(4L)); } public void testCacheWithFilteredAlias() { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java index 4d5af51ec1f..f1138019656 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java @@ -19,15 +19,25 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.query.WrapperQueryBuilder; +import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder; import org.elasticsearch.test.AbstractQueryTestCase; import org.elasticsearch.test.ESTestCase; @@ -39,6 +49,7 @@ import java.util.regex.Pattern; import static java.util.Collections.emptyList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class AggregatorFactoriesTests extends ESTestCase { private String[] currentTypes; @@ -236,6 +247,44 @@ public class AggregatorFactoriesTests extends ESTestCase { assertThat(e.toString(), containsString("Expected [START_OBJECT] under [field], but got a [VALUE_STRING] in [tag_count]")); } + public void testRewrite() throws Exception { + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference bytesReference; + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType)) { + builder.startObject(); + { + builder.startObject("terms"); + { + builder.array("title", "foo"); + } + builder.endObject(); + } + builder.endObject(); + bytesReference = builder.bytes(); + } + FilterAggregationBuilder filterAggBuilder = new FilterAggregationBuilder("titles", new WrapperQueryBuilder(bytesReference)); + BucketScriptPipelineAggregationBuilder pipelineAgg = new BucketScriptPipelineAggregationBuilder("const", new Script("1")); + AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(filterAggBuilder) + .addPipelineAggregator(pipelineAgg); + AggregatorFactories.Builder rewritten = builder + .rewrite(new QueryRewriteContext(null, null, null, xContentRegistry, null, null, () -> 0L)); + assertNotSame(builder, rewritten); + List aggregatorFactories = rewritten.getAggregatorFactories(); + assertEquals(1, aggregatorFactories.size()); + assertThat(aggregatorFactories.get(0), instanceOf(FilterAggregationBuilder.class)); + FilterAggregationBuilder rewrittenFilterAggBuilder = (FilterAggregationBuilder) aggregatorFactories.get(0); + assertNotSame(filterAggBuilder, rewrittenFilterAggBuilder); + assertNotEquals(filterAggBuilder, rewrittenFilterAggBuilder); + // Check the filter was rewritten from a wrapper query to a terms query + QueryBuilder rewrittenFilter = rewrittenFilterAggBuilder.getFilter(); + assertThat(rewrittenFilter, instanceOf(TermsQueryBuilder.class)); + + // Check that a further rewrite returns the same aggregation factories builder + AggregatorFactories.Builder secondRewritten = rewritten + .rewrite(new QueryRewriteContext(null, null, null, xContentRegistry, null, null, () -> 0L)); + assertSame(rewritten, secondRewritten); + } + @Override protected NamedXContentRegistry xContentRegistry() { return xContentRegistry; diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/50_filter.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/50_filter.yml index 2bef1b6aa23..a094628ae92 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/50_filter.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/50_filter.yml @@ -32,10 +32,11 @@ setup: indices.refresh: {} --- -"Filter aggs with terms lookup ensure not cached": +"Filter aggs with terms lookup and ensure it's cached": + # Because the filter agg rewrites the terms lookup in the rewrite phase the request can be cached - skip: - version: " - 5.0.0" - reason: This using filter aggs that needs rewriting, this was fixed in 5.0.1 + version: " - 5.99.99" + reason: This using filter aggs that are rewritten, this was added in 6.0.0 - do: search: @@ -53,7 +54,7 @@ setup: indices.stats: { index: test, metric: request_cache} - match: { _shards.total: 1 } - match: { _all.total.request_cache.hit_count: 0 } - - match: { _all.total.request_cache.miss_count: 0 } + - match: { _all.total.request_cache.miss_count: 1 } - is_true: indices.test ---