From eff5f4d234fa6033528e21c79a4fc00add56a7ea Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 6 Jul 2020 15:13:45 -0400 Subject: [PATCH] Add pipeline aggregations to the rewrite phase (backport #58878) (#59081) This allows pipeline aggregations to participate in the up-front rewrite phase for searches, in particular, it allows them to load data that they need asynchronously. Relates to #58193 Co-authored-by: Elastic Machine --- .../index/query/Rewriteable.java | 2 +- .../aggregations/AggregationBuilder.java | 19 +-- .../aggregations/AggregatorFactories.java | 21 ++-- .../PipelineAggregationBuilder.java | 22 +++- .../AggregatorFactoriesTests.java | 115 +++++++++++++++++- 5 files changed, 148 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/query/Rewriteable.java b/server/src/main/java/org/elasticsearch/index/query/Rewriteable.java index ba8d6b84d53..4599764233b 100644 --- a/server/src/main/java/org/elasticsearch/index/query/Rewriteable.java +++ b/server/src/main/java/org/elasticsearch/index/query/Rewriteable.java @@ -59,7 +59,7 @@ public interface Rewriteable { * @param original the original rewriteable to rewrite * @param context the rewrite context to use * @param assertNoAsyncTasks if true the rewrite will fail if there are any pending async tasks on the context after the - * rewrite. See {@link QueryRewriteContext#executeAsyncActions(ActionListener)} for detals + * rewrite. See {@link QueryRewriteContext#executeAsyncActions(ActionListener)} for details * @throws IOException if an {@link IOException} occurs */ static > T rewrite(T original, QueryRewriteContext context, boolean assertNoAsyncTasks) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index d8762eee8ce..50f681bd79e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; @@ -37,7 +38,7 @@ import java.util.Map; * A factory that knows how to create an {@link Aggregator} of a specific type. */ public abstract class AggregationBuilder - implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder { + implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder, Rewriteable { protected final String name; protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder(); @@ -109,6 +110,7 @@ public abstract class AggregationBuilder */ protected abstract AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metadata); + @Override public final AggregationBuilder rewrite(QueryRewriteContext context) throws IOException { AggregationBuilder rewritten = doRewrite(context); AggregatorFactories.Builder rewrittenSubAggs = factoriesBuilder.rewrite(context); @@ -131,21 +133,6 @@ public abstract class AggregationBuilder return this; } - /** - * Rewrites the given aggregation into its primitive form. Aggregations that for instance fetch resources from remote hosts or - * can simplify / optimize itself should do their heavy lifting during {@link #rewrite(QueryRewriteContext)}. This method - * rewrites the aggregation until it doesn't change anymore. - * @throws IOException if an {@link IOException} occurs - */ - static AggregationBuilder rewriteAggregation(AggregationBuilder original, QueryRewriteContext context) throws IOException { - AggregationBuilder builder = original; - for (AggregationBuilder rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder; - rewrittenBuilder = builder.rewrite(context)) { - builder = rewrittenBuilder; - } - return builder; - } - /** * Build a tree of {@link PipelineAggregator}s to modify the tree of * aggregation results after the final reduction. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index c55db182a47..e207bc20e5c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentLocation; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -500,21 +501,17 @@ public class AggregatorFactories { Builder newBuilder = new Builder(); for (AggregationBuilder builder : aggregationBuilders) { - AggregationBuilder result = AggregationBuilder.rewriteAggregation(builder, context); - if (result != builder) { - changed = true; - } + AggregationBuilder result = Rewriteable.rewrite(builder, context); newBuilder.addAggregator(result); + changed |= result != builder; + } + for (PipelineAggregationBuilder builder : pipelineAggregatorBuilders) { + PipelineAggregationBuilder result = Rewriteable.rewrite(builder, context); + newBuilder.addPipelineAggregator(result); + changed |= result != builder; } - if (changed) { - for (PipelineAggregationBuilder builder : pipelineAggregatorBuilders) { - newBuilder.addPipelineAggregator(builder); - } - return newBuilder; - } else { - return this; - } + return changed ? newBuilder : this; } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java index 0f977b72e97..8dc516f5b9b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java @@ -23,12 +23,15 @@ import org.elasticsearch.action.ValidateActions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -37,7 +40,12 @@ import java.util.Objects; * A factory that knows how to create an {@link PipelineAggregator} of a * specific type. */ -public abstract class PipelineAggregationBuilder implements NamedWriteable, BaseAggregationBuilder, ToXContentFragment { +public abstract class PipelineAggregationBuilder + implements + NamedWriteable, + BaseAggregationBuilder, + ToXContentFragment, + Rewriteable { protected final String name; protected final String[] bucketsPaths; @@ -245,4 +253,16 @@ public abstract class PipelineAggregationBuilder implements NamedWriteable, Base public String toString() { return Strings.toString(this, true, true); } + + /** + * {@inheritDoc} + *

+ * The default implementation return the same instance. It should be + * overridden by aggregations that must load data before they can be run, + * particularly if that load must by asynchronous. + */ + @Override + public PipelineAggregationBuilder rewrite(QueryRewriteContext context) throws IOException { + return this; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java index 75154934661..632d472dbc5 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorFactoriesTests.java @@ -18,8 +18,12 @@ */ package org.elasticsearch.search.aggregations; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -28,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.TermsQueryBuilder; @@ -35,15 +40,20 @@ import org.elasticsearch.index.query.WrapperQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.test.AbstractQueryTestCase; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Map; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -218,7 +228,7 @@ public class AggregatorFactoriesTests extends ESTestCase { assertThat(e.toString(), containsString("Unknown aggregation type [term] did you mean [terms]?")); } - public void testRewrite() throws Exception { + public void testRewriteAggregation() throws Exception { XContentType xContentType = randomFrom(XContentType.values()); BytesReference bytesReference; try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType)) { @@ -256,6 +266,57 @@ public class AggregatorFactoriesTests extends ESTestCase { assertSame(rewritten, secondRewritten); } + public void testRewritePipelineAggregationUnderAggregation() throws Exception { + FilterAggregationBuilder filterAggBuilder = new FilterAggregationBuilder("titles", new MatchAllQueryBuilder()) + .subAggregation(new RewrittenPipelineAggregationBuilder()); + AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(filterAggBuilder); + QueryRewriteContext context = new QueryRewriteContext(xContentRegistry, null, null, () -> 0L); + AggregatorFactories.Builder rewritten = builder.rewrite(context); + CountDownLatch latch = new CountDownLatch(1); + context.executeAsyncActions(new ActionListener() { + @Override + public void onResponse(Object response) { + assertNotSame(builder, rewritten); + Collection aggregatorFactories = rewritten.getAggregatorFactories(); + assertEquals(1, aggregatorFactories.size()); + FilterAggregationBuilder rewrittenFilterAggBuilder = (FilterAggregationBuilder) aggregatorFactories.iterator().next(); + PipelineAggregationBuilder rewrittenPipeline = rewrittenFilterAggBuilder.getPipelineAggregations().iterator().next(); + assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten")); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + latch.await(); + } + + public void testRewriteAggregationAtTopLevel() throws Exception { + FilterAggregationBuilder filterAggBuilder = new FilterAggregationBuilder("titles", new MatchAllQueryBuilder()); + AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(filterAggBuilder) + .addPipelineAggregator(new RewrittenPipelineAggregationBuilder()); + QueryRewriteContext context = new QueryRewriteContext(xContentRegistry, null, null, () -> 0L); + AggregatorFactories.Builder rewritten = builder.rewrite(context); + CountDownLatch latch = new CountDownLatch(1); + context.executeAsyncActions(new ActionListener() { + @Override + public void onResponse(Object response) { + assertNotSame(builder, rewritten); + PipelineAggregationBuilder rewrittenPipeline = rewritten.getPipelineAggregatorFactories().iterator().next(); + assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten")); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + latch.await(); + } + public void testBuildPipelineTreeResolvesPipelineOrder() { AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); builder.addPipelineAggregator(PipelineAggregatorBuilders.avgBucket("bar", "foo")); @@ -270,4 +331,56 @@ public class AggregatorFactoriesTests extends ESTestCase { protected NamedXContentRegistry xContentRegistry() { return xContentRegistry; } + + private class RewrittenPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { + private final Supplier setOnRewrite; + + RewrittenPipelineAggregationBuilder() { + super("test", "rewritten", Strings.EMPTY_ARRAY); + setOnRewrite = null; + } + + RewrittenPipelineAggregationBuilder(Supplier setOnRewrite) { + super("test", "rewritten", Strings.EMPTY_ARRAY); + this.setOnRewrite = setOnRewrite; + } + + @Override + public PipelineAggregationBuilder rewrite(QueryRewriteContext context) throws IOException { + if (setOnRewrite != null) { + return this; + } + SetOnce loaded = new SetOnce<>(); + context.registerAsyncAction((client, listener) -> { + loaded.set("rewritten"); + listener.onResponse(null); + }); + return new RewrittenPipelineAggregationBuilder(loaded::get); + } + + @Override + public String getWriteableName() { + return "rewritten"; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected PipelineAggregator createInternal(Map metadata) { + throw new UnsupportedOperationException(); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + protected void validate(ValidationContext context) { + throw new UnsupportedOperationException(); + } + } }