Add pipeline aggregations to the rewrite phase (backport #58878) (#59081)

This allows pipeline aggregations to participate in the up-front rewrite
phase for searches, in particular, it allows them to load data that they
need asynchronously.

Relates to #58193

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Nik Everett 2020-07-06 15:13:45 -04:00 committed by GitHub
parent e827d2ed92
commit eff5f4d234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 148 additions and 31 deletions

View File

@ -59,7 +59,7 @@ public interface Rewriteable<T> {
* @param original the original rewriteable to rewrite
* @param context the rewrite context to use
* @param assertNoAsyncTasks if <code>true</code> the rewrite will fail if there are any pending async tasks on the context after the
* rewrite. See {@link QueryRewriteContext#executeAsyncActions(ActionListener)} for detals
* rewrite. See {@link QueryRewriteContext#executeAsyncActions(ActionListener)} for details
* @throws IOException if an {@link IOException} occurs
*/
static <T extends Rewriteable<T>> T rewrite(T original, QueryRewriteContext context, boolean assertNoAsyncTasks) throws IOException {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
@ -37,7 +38,7 @@ import java.util.Map;
* A factory that knows how to create an {@link Aggregator} of a specific type.
*/
public abstract class AggregationBuilder
implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder {
implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder, Rewriteable<AggregationBuilder> {
protected final String name;
protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder();
@ -109,6 +110,7 @@ public abstract class AggregationBuilder
*/
protected abstract AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata);
@Override
public final AggregationBuilder rewrite(QueryRewriteContext context) throws IOException {
AggregationBuilder rewritten = doRewrite(context);
AggregatorFactories.Builder rewrittenSubAggs = factoriesBuilder.rewrite(context);
@ -131,21 +133,6 @@ public abstract class AggregationBuilder
return this;
}
/**
* Rewrites the given aggregation into its primitive form. Aggregations that for instance fetch resources from remote hosts or
* can simplify / optimize itself should do their heavy lifting during {@link #rewrite(QueryRewriteContext)}. This method
* rewrites the aggregation until it doesn't change anymore.
* @throws IOException if an {@link IOException} occurs
*/
static AggregationBuilder rewriteAggregation(AggregationBuilder original, QueryRewriteContext context) throws IOException {
AggregationBuilder builder = original;
for (AggregationBuilder rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
rewrittenBuilder = builder.rewrite(context)) {
builder = rewrittenBuilder;
}
return builder;
}
/**
* Build a tree of {@link PipelineAggregator}s to modify the tree of
* aggregation results after the final reduction.

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -500,21 +501,17 @@ public class AggregatorFactories {
Builder newBuilder = new Builder();
for (AggregationBuilder builder : aggregationBuilders) {
AggregationBuilder result = AggregationBuilder.rewriteAggregation(builder, context);
if (result != builder) {
changed = true;
}
AggregationBuilder result = Rewriteable.rewrite(builder, context);
newBuilder.addAggregator(result);
changed |= result != builder;
}
for (PipelineAggregationBuilder builder : pipelineAggregatorBuilders) {
PipelineAggregationBuilder result = Rewriteable.rewrite(builder, context);
newBuilder.addPipelineAggregator(result);
changed |= result != builder;
}
if (changed) {
for (PipelineAggregationBuilder builder : pipelineAggregatorBuilders) {
newBuilder.addPipelineAggregator(builder);
}
return newBuilder;
} else {
return this;
}
return changed ? newBuilder : this;
}
/**

View File

@ -23,12 +23,15 @@ import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
@ -37,7 +40,12 @@ import java.util.Objects;
* A factory that knows how to create an {@link PipelineAggregator} of a
* specific type.
*/
public abstract class PipelineAggregationBuilder implements NamedWriteable, BaseAggregationBuilder, ToXContentFragment {
public abstract class PipelineAggregationBuilder
implements
NamedWriteable,
BaseAggregationBuilder,
ToXContentFragment,
Rewriteable<PipelineAggregationBuilder> {
protected final String name;
protected final String[] bucketsPaths;
@ -245,4 +253,16 @@ public abstract class PipelineAggregationBuilder implements NamedWriteable, Base
public String toString() {
return Strings.toString(this, true, true);
}
/**
* {@inheritDoc}
* <p>
* The default implementation return the same instance. It should be
* overridden by aggregations that must load data before they can be run,
* particularly if that load must by asynchronous.
*/
@Override
public PipelineAggregationBuilder rewrite(QueryRewriteContext context) throws IOException {
return this;
}
}

View File

@ -18,8 +18,12 @@
*/
package org.elasticsearch.search.aggregations;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -28,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.TermsQueryBuilder;
@ -35,15 +40,20 @@ import org.elasticsearch.index.query.WrapperQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.AbstractQueryTestCase;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -218,7 +228,7 @@ public class AggregatorFactoriesTests extends ESTestCase {
assertThat(e.toString(), containsString("Unknown aggregation type [term] did you mean [terms]?"));
}
public void testRewrite() throws Exception {
public void testRewriteAggregation() throws Exception {
XContentType xContentType = randomFrom(XContentType.values());
BytesReference bytesReference;
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType)) {
@ -256,6 +266,57 @@ public class AggregatorFactoriesTests extends ESTestCase {
assertSame(rewritten, secondRewritten);
}
public void testRewritePipelineAggregationUnderAggregation() throws Exception {
FilterAggregationBuilder filterAggBuilder = new FilterAggregationBuilder("titles", new MatchAllQueryBuilder())
.subAggregation(new RewrittenPipelineAggregationBuilder());
AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(filterAggBuilder);
QueryRewriteContext context = new QueryRewriteContext(xContentRegistry, null, null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
@Override
public void onResponse(Object response) {
assertNotSame(builder, rewritten);
Collection<AggregationBuilder> aggregatorFactories = rewritten.getAggregatorFactories();
assertEquals(1, aggregatorFactories.size());
FilterAggregationBuilder rewrittenFilterAggBuilder = (FilterAggregationBuilder) aggregatorFactories.iterator().next();
PipelineAggregationBuilder rewrittenPipeline = rewrittenFilterAggBuilder.getPipelineAggregations().iterator().next();
assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten"));
latch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});
latch.await();
}
public void testRewriteAggregationAtTopLevel() throws Exception {
FilterAggregationBuilder filterAggBuilder = new FilterAggregationBuilder("titles", new MatchAllQueryBuilder());
AggregatorFactories.Builder builder = new AggregatorFactories.Builder().addAggregator(filterAggBuilder)
.addPipelineAggregator(new RewrittenPipelineAggregationBuilder());
QueryRewriteContext context = new QueryRewriteContext(xContentRegistry, null, null, () -> 0L);
AggregatorFactories.Builder rewritten = builder.rewrite(context);
CountDownLatch latch = new CountDownLatch(1);
context.executeAsyncActions(new ActionListener<Object>() {
@Override
public void onResponse(Object response) {
assertNotSame(builder, rewritten);
PipelineAggregationBuilder rewrittenPipeline = rewritten.getPipelineAggregatorFactories().iterator().next();
assertThat(((RewrittenPipelineAggregationBuilder) rewrittenPipeline).setOnRewrite.get(), equalTo("rewritten"));
latch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});
latch.await();
}
public void testBuildPipelineTreeResolvesPipelineOrder() {
AggregatorFactories.Builder builder = new AggregatorFactories.Builder();
builder.addPipelineAggregator(PipelineAggregatorBuilders.avgBucket("bar", "foo"));
@ -270,4 +331,56 @@ public class AggregatorFactoriesTests extends ESTestCase {
protected NamedXContentRegistry xContentRegistry() {
return xContentRegistry;
}
private class RewrittenPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<RewrittenPipelineAggregationBuilder> {
private final Supplier<String> setOnRewrite;
RewrittenPipelineAggregationBuilder() {
super("test", "rewritten", Strings.EMPTY_ARRAY);
setOnRewrite = null;
}
RewrittenPipelineAggregationBuilder(Supplier<String> setOnRewrite) {
super("test", "rewritten", Strings.EMPTY_ARRAY);
this.setOnRewrite = setOnRewrite;
}
@Override
public PipelineAggregationBuilder rewrite(QueryRewriteContext context) throws IOException {
if (setOnRewrite != null) {
return this;
}
SetOnce<String> loaded = new SetOnce<>();
context.registerAsyncAction((client, listener) -> {
loaded.set("rewritten");
listener.onResponse(null);
});
return new RewrittenPipelineAggregationBuilder(loaded::get);
}
@Override
public String getWriteableName() {
return "rewritten";
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
throw new UnsupportedOperationException();
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
throw new UnsupportedOperationException();
}
@Override
protected void validate(ValidationContext context) {
throw new UnsupportedOperationException();
}
}
}