diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index b4dd748f828..77dafaf76ca 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -15,15 +15,15 @@ refresh: true body: - '{"index": {"_index": "test_index"}}' - - '{"f1": "local_cluster", "filter_field": 0}' + - '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "local_cluster", "filter_field": 1}' + - '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "local_cluster", "filter_field": 0}' + - '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "local_cluster", "filter_field": 1}' + - '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "local_cluster", "filter_field": 0}' + - '{"f1": "local_cluster", "animal": "pig", "filter_field": 0}' - do: search: @@ -115,6 +115,87 @@ - match: { aggregations.cluster.buckets.0.key: "local_cluster" } - match: { aggregations.cluster.buckets.0.doc_count: 5 } + # once more, this time with a top level pipeline agg + - do: + search: + rest_total_hits_as_int: true + index: test_index,my_remote_cluster:test_index + body: + seq_no_primary_term: true + aggs: + cluster: + terms: + field: f1.keyword + aggs: + s: + sum: + field: filter_field + average_sum: + avg_bucket: + buckets_path: cluster.s + + - match: { num_reduce_phases: 3 } + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 5 } + - match: { hits.total: 11 } + - length: { aggregations.cluster.buckets: 2 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + - match: { aggregations.cluster.buckets.0.s.value: 2 } + - match: { aggregations.cluster.buckets.1.key: "local_cluster" } + - match: { aggregations.cluster.buckets.1.s.value: 2 } + - match: { aggregations.average_sum.value: 2 } + + # and now a non-top-level pipeline agg! + - do: + search: + rest_total_hits_as_int: true + index: test_index,my_remote_cluster:test_index + body: + seq_no_primary_term: true + aggs: + cluster: + terms: + field: f1.keyword + aggs: + animal: + terms: + field: animal.keyword + aggs: + s: + sum: + field: filter_field + average_sum: + avg_bucket: + buckets_path: animal.s + + - match: { num_reduce_phases: 3 } + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 5 } + - match: { hits.total: 11 } + - length: { aggregations.cluster.buckets: 2 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + - match: { aggregations.cluster.buckets.0.animal.buckets.0.key: "chicken" } + - match: { aggregations.cluster.buckets.0.animal.buckets.0.doc_count: 4 } + - match: { aggregations.cluster.buckets.0.animal.buckets.0.s.value: 1 } + - match: { aggregations.cluster.buckets.0.animal.buckets.1.key: "pig" } + - match: { aggregations.cluster.buckets.0.animal.buckets.1.doc_count: 2 } + - match: { aggregations.cluster.buckets.0.animal.buckets.1.s.value: 1 } + - match: { aggregations.cluster.buckets.0.average_sum.value: 1 } + - match: { aggregations.cluster.buckets.1.key: "local_cluster" } + - match: { aggregations.cluster.buckets.1.animal.buckets.0.key: "dog" } + - match: { aggregations.cluster.buckets.1.animal.buckets.0.doc_count: 4 } + - match: { aggregations.cluster.buckets.1.animal.buckets.0.s.value: 2 } + - match: { aggregations.cluster.buckets.1.animal.buckets.1.key: "pig" } + - match: { aggregations.cluster.buckets.1.animal.buckets.1.doc_count: 1 } + - match: { aggregations.cluster.buckets.1.animal.buckets.1.s.value: 0 } + - match: { aggregations.cluster.buckets.1.average_sum.value: 1 } + --- "Add transient remote cluster based on the preset cluster": - do: diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml index 19b3771acf8..95109a0a593 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/remote_cluster/10_basic.yml @@ -91,17 +91,17 @@ refresh: true body: - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"f1": "remote_cluster", "animal": "pig", "filter_field": 0}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 1}' + - '{"f1": "remote_cluster", "animal": "pig", "filter_field": 1}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 1}' + - '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 1}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}' - '{"index": {"_index": "test_index"}}' - - '{"f1": "remote_cluster", "filter_field": 0}' + - '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}' - do: search: diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f7532a90cef..7323a54e726 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1181,7 +1181,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return new InternalAggregation.ReduceContextBuilder() { @Override public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService); + return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService, + () -> requestToPipelineTree(request)); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index c150f99cfe1..c0460106f0a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -133,17 +133,16 @@ public class AggregationPhase implements SearchPhase { } } List pipelineAggregators = context.aggregations().factories().createPipelineAggregators(); - List siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size()); for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - if (pipelineAggregator instanceof SiblingPipelineAggregator) { - siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator); - } else { + if (false == pipelineAggregator instanceof SiblingPipelineAggregator) { + // TODO move this to request validation after #53669 throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name() + "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are " + "allowed at the top level"); } } - context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators)); + context.queryResult().aggregations(new InternalAggregations(aggregations, + context.request().source().aggregations()::buildPipelineTree)); // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 0015756d4d9..cffc6109004 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,7 +38,9 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; import java.util.function.IntConsumer; +import java.util.function.Supplier; +import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; /** @@ -62,12 +65,19 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable private final ScriptService scriptService; private final IntConsumer multiBucketConsumer; private final PipelineTree pipelineTreeRoot; + /** + * Supplies the pipelines when the result of the reduce is serialized + * to node versions that need pipeline aggregators to be serialized + * to them. + */ + private final Supplier pipelineTreeForBwcSerialization; /** * Build a {@linkplain ReduceContext} to perform a partial reduction. */ - public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService) { - return new ReduceContext(bigArrays, scriptService, (s) -> {}, null); + public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService, + Supplier pipelineTreeForBwcSerialization) { + return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization); } /** @@ -77,15 +87,16 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable public static ReduceContext forFinalReduction(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer, PipelineTree pipelineTreeRoot) { return new ReduceContext(bigArrays, scriptService, multiBucketConsumer, - requireNonNull(pipelineTreeRoot, "prefer EMPTY to null")); + requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"), () -> pipelineTreeRoot); } private ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer, - PipelineTree pipelineTreeRoot) { + PipelineTree pipelineTreeRoot, Supplier pipelineTreeForBwcSerialization) { this.bigArrays = bigArrays; this.scriptService = scriptService; this.multiBucketConsumer = multiBucketConsumer; this.pipelineTreeRoot = pipelineTreeRoot; + this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization; } /** @@ -112,6 +123,15 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable return pipelineTreeRoot; } + /** + * Supplies the pipelines when the result of the reduce is serialized + * to node versions that need pipeline aggregators to be serialized + * to them. + */ + public Supplier pipelineTreeForBwcSerialization() { + return pipelineTreeForBwcSerialization; + } + /** * Adds {@code count} buckets to the global count for the request and fails if this number is greater than * the maximum number of buckets allowed in a response @@ -129,9 +149,9 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable private final List pipelineAggregators; /** - * Constructs an get with a given name. + * Constructs an aggregation result with a given name. * - * @param name The name of the get. + * @param name The name of the aggregation. */ protected InternalAggregation(String name, List pipelineAggregators, Map metaData) { this.name = name; @@ -145,14 +165,20 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable protected InternalAggregation(StreamInput in) throws IOException { name = in.readString(); metaData = in.readMap(); - pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class); + if (in.getVersion().before(Version.V_7_8_0)) { + pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class); + } else { + pipelineAggregators = emptyList(); + } } @Override public final void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeGenericValue(metaData); - out.writeNamedWriteableList(pipelineAggregators); + if (out.getVersion().before(Version.V_7_8_0)) { + out.writeNamedWriteableList(pipelineAggregators); + } doWriteTo(out); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index a7475c071c9..3b7ff111d3b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -35,9 +35,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; + /** * An internal implementation of {@link Aggregations}. */ @@ -55,40 +59,56 @@ public final class InternalAggregations extends Aggregations implements Writeabl } }; - private final List topLevelPipelineAggregators; + /** + * The way to build a tree of pipeline aggregators. Used only for + * serialization backwards compatibility. + */ + private final Supplier pipelineTreeForBwcSerialization; /** * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); - this.topLevelPipelineAggregators = Collections.emptyList(); + this.pipelineTreeForBwcSerialization = null; } /** - * Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s + * Constructs a node in the aggregation tree. + * @param pipelineTreeSource must be null inside the tree or after final reduction. Should reference the + * search request otherwise so we can properly serialize the response to + * versions of Elasticsearch that require the pipelines to be serialized. */ - public InternalAggregations(List aggregations, List topLevelPipelineAggregators) { + public InternalAggregations(List aggregations, Supplier pipelineTreeSource) { super(aggregations); - this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators); + this.pipelineTreeForBwcSerialization = pipelineTreeSource; } public InternalAggregations(StreamInput in) throws IOException { super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class))); - if (in.getVersion().onOrAfter(Version.V_6_7_0)) { - this.topLevelPipelineAggregators = in.readList( - stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); - } else { - this.topLevelPipelineAggregators = Collections.emptyList(); + if (in.getVersion().before(Version.V_7_8_0) && in.getVersion().onOrAfter(Version.V_6_7_0)) { + in.readNamedWriteableList(PipelineAggregator.class); } + /* + * Setting the pipeline tree source to null is here is correct but + * only because we don't immediately pass the InternalAggregations + * off to another node. Instead, we always reduce together with + * many aggregations and that always adds the tree read from the + * current request. + */ + pipelineTreeForBwcSerialization = null; } @Override @SuppressWarnings("unchecked") public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableList((List)aggregations); - if (out.getVersion().onOrAfter(Version.V_6_7_0)) { - out.writeNamedWriteableList(topLevelPipelineAggregators); + if (out.getVersion().before(Version.V_7_8_0) && out.getVersion().onOrAfter(Version.V_6_7_0)) { + if (pipelineTreeForBwcSerialization == null) { + out.writeNamedWriteableList(emptyList()); + } else { + out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators()); + } } } @@ -102,12 +122,17 @@ public final class InternalAggregations extends Aggregations implements Writeabl } /** - * Returns the top-level pipeline aggregators. - * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they - * become part of the list of {@link InternalAggregation}s. + * Get the top level pipeline aggregators. + * @deprecated these only exist for BWC serialization */ + @Deprecated public List getTopLevelPipelineAggregators() { - return topLevelPipelineAggregators; + if (pipelineTreeForBwcSerialization == null) { + return emptyList(); + } + return pipelineTreeForBwcSerialization.get().aggregators().stream() + .map(p -> (SiblingPipelineAggregator) p) + .collect(toList()); } @SuppressWarnings("unchecked") @@ -138,7 +163,8 @@ public final class InternalAggregations extends Aggregations implements Writeabl * aggregations (both embedded parent/sibling as well as top-level sibling pipelines) */ public static InternalAggregations topLevelReduce(List aggregationsList, ReduceContext context) { - InternalAggregations reduced = reduce(aggregationsList, context); + InternalAggregations reduced = reduce(aggregationsList, context, + reducedAggregations -> new InternalAggregations(reducedAggregations, context.pipelineTreeForBwcSerialization())); if (reduced == null) { return null; } @@ -164,12 +190,16 @@ public final class InternalAggregations extends Aggregations implements Writeabl * {@link InternalAggregations} object found in the list. * Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled * separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)} + * @param ctor used to build the {@link InternalAggregations}. The top level reduce specifies a constructor + * that adds pipeline aggregation information that is used to send pipeline aggregations to + * older versions of Elasticsearch that require the pipeline aggregations to be returned + * as part of the aggregation tree */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + public static InternalAggregations reduce(List aggregationsList, ReduceContext context, + Function, InternalAggregations> ctor) { if (aggregationsList.isEmpty()) { return null; } - List topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators(); // first we collect all aggregations of the same type and list them together Map> aggByName = new HashMap<>(); @@ -192,6 +222,14 @@ public final class InternalAggregations extends Aggregations implements Writeabl reducedAggregations.add(first.reduce(aggregations, context)); } - return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); + return ctor.apply(reducedAggregations); } + + /** + * Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree. + */ + public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { + return reduce(aggregationsList, context, InternalAggregations::new); + } + } diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 32be8aba982..8b7ebdb6907 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -20,12 +20,10 @@ package org.elasticsearch.search.query; import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; import java.io.IOException; -import java.util.List; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; @@ -40,7 +38,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.internal.SearchContextId; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; @@ -321,28 +318,13 @@ public final class QuerySearchResult extends SearchPhaseResult { } setTopDocs(readTopDocs(in)); if (in.getVersion().before(Version.V_7_7_0)) { - InternalAggregations readAggs = null; if (hasAggs = in.readBoolean()) { - readAggs = new InternalAggregations(in); + aggregations = DelayableWriteable.referencing(new InternalAggregations(in)); } if (in.getVersion().before(Version.V_7_2_0)) { - List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() - .map(a -> (SiblingPipelineAggregator) a).collect(toList()); - if (hasAggs && pipelineAggregators.isEmpty() == false) { - List internalAggs = readAggs.copyResults(); - /* - * Earlier versions serialize sibling pipeline aggs - * separately as they used to be set to QuerySearchResult - * directly, while later versions include them in - * InternalAggregations. Note that despite serializing - * sibling pipeline aggs as part of nternalAggregations is - * supported since 6.7.0, the shards set sibling pipeline - * aggs to InternalAggregations only from 7.1. - */ - readAggs = new InternalAggregations(internalAggs, pipelineAggregators); - } + // The list of PipelineAggregators is sent by old versions. We don't need it anyway. + in.readNamedWriteableList(PipelineAggregator.class); } - aggregations = DelayableWriteable.referencing(readAggs); } else { if (hasAggs = in.readBoolean()) { aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 16fa7b97ddc..158ad0ff2f3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -97,7 +97,8 @@ public class SearchPhaseControllerTests extends ESTestCase { @Override public ReduceContext forPartialReduction() { reductions.add(false); - return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null); + return InternalAggregation.ReduceContext.forPartialReduction( + BigArrays.NON_RECYCLING_INSTANCE, null, () -> PipelineTree.EMPTY); } public ReduceContext forFinalReduction() { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 5a1d2fe351e..f8b2d9ccd0a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -30,14 +30,11 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.Strings; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContextId; @@ -45,6 +42,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.transport.Transport; import java.util.Collections; @@ -125,17 +123,7 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase { searchRequest.source().trackTotalHitsUpTo(2); } searchRequest.allowPartialSearchResults(false); - SearchPhaseController controller = new SearchPhaseController((b) -> new InternalAggregation.ReduceContextBuilder() { - @Override - public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null); - } - - public InternalAggregation.ReduceContext forFinalReduction() { - return InternalAggregation.ReduceContext.forFinalReduction( - BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineAggregator.PipelineTree.EMPTY); - }; - }); + SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 81d37b0d5a3..28204b754cd 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -19,6 +19,28 @@ package org.elasticsearch.action.search; +import static org.elasticsearch.test.InternalAggregationTestCase.emptyReduceContextBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.startsWith; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Function; + import org.apache.lucene.search.TotalHits; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; @@ -57,7 +79,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.internal.AliasFilter; @@ -80,29 +101,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.startsWith; - public class TransportSearchActionTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @@ -395,7 +393,7 @@ public class TransportSearchActionTests extends ESTestCase { LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -440,7 +438,7 @@ public class TransportSearchActionTests extends ESTestCase { LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -466,7 +464,7 @@ public class TransportSearchActionTests extends ESTestCase { LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -513,7 +511,7 @@ public class TransportSearchActionTests extends ESTestCase { LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -541,8 +539,8 @@ public class TransportSearchActionTests extends ESTestCase { AtomicReference response = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); - TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -582,7 +580,7 @@ public class TransportSearchActionTests extends ESTestCase { LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, - aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); if (localIndices == null) { assertNull(setOnce.get()); } else { @@ -763,7 +761,8 @@ public class TransportSearchActionTests extends ESTestCase { assertEquals(-1, source.size()); assertEquals(-1, source.from()); assertNull(source.trackTotalHitsUpTo()); - SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, aggReduceContextBuilder()); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger( + source, timeProvider, emptyReduceContextBuilder()); assertEquals(0, merger.from); assertEquals(10, merger.size); assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); @@ -772,7 +771,7 @@ public class TransportSearchActionTests extends ESTestCase { assertNull(source.trackTotalHitsUpTo()); } { - SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, aggReduceContextBuilder()); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, emptyReduceContextBuilder()); assertEquals(0, merger.from); assertEquals(10, merger.size); assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); @@ -785,7 +784,8 @@ public class TransportSearchActionTests extends ESTestCase { source.size(originalSize); int trackTotalHitsUpTo = randomIntBetween(0, Integer.MAX_VALUE); source.trackTotalHitsUpTo(trackTotalHitsUpTo); - SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, aggReduceContextBuilder()); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger( + source, timeProvider, emptyReduceContextBuilder()); assertEquals(0, source.from()); assertEquals(originalFrom + originalSize, source.size()); assertEquals(trackTotalHitsUpTo, (int)source.trackTotalHitsUpTo()); @@ -937,18 +937,4 @@ public class TransportSearchActionTests extends ESTestCase { indices, randomIntBetween(127, 10000))); } } - - private InternalAggregation.ReduceContextBuilder aggReduceContextBuilder() { - return new InternalAggregation.ReduceContextBuilder() { - @Override - public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction(null, null); - }; - - @Override - public InternalAggregation.ReduceContext forFinalReduction() { - return InternalAggregation.ReduceContext.forFinalReduction(null, null, b -> {}, new PipelineTree(emptyMap(), emptyList())); - } - }; - } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index 50b3fb7e058..67c57212c71 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; @@ -33,7 +32,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.elasticsearch.test.ESTestCase; @@ -45,9 +44,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; public class InternalAggregationsTests extends ESTestCase { @@ -64,13 +63,8 @@ public class InternalAggregationsTests extends ESTestCase { public void testNonFinalReduceTopLevelPipelineAggs() { InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); - List topLevelPipelineAggs = new ArrayList<>(); - MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); - topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); - List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms), - topLevelPipelineAggs)); - InternalAggregation.ReduceContext reduceContext = InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction(); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext); + List aggs = singletonList(new InternalAggregations(Collections.singletonList(terms))); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -79,16 +73,20 @@ public class InternalAggregationsTests extends ESTestCase { InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); - MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); - SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create(); - InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction( - BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, new PipelineTree(emptyMap(), singletonList(siblingPipelineAggregator))); - InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), emptyList()); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), + maxBucketReduceContext().forFinalReduction()); assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } + private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() { + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + PipelineAggregator.PipelineTree tree = + new PipelineAggregator.PipelineTree(emptyMap(), singletonList(maxBucketPipelineAggregationBuilder.create())); + return InternalAggregationTestCase.emptyReduceContextBuilder(tree); + } + public static InternalAggregations createTestInstance() throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { @@ -106,7 +104,11 @@ public class InternalAggregationsTests extends ESTestCase { InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); aggsList.add(simpleValueTests.createTestInstance()); } - List topLevelPipelineAggs = new ArrayList<>(); + return new InternalAggregations(aggsList); + } + + private static PipelineAggregator.PipelineTree randomPipelineTree() { + List topLevelPipelineAggs = new ArrayList<>(); if (randomBoolean()) { if (randomBoolean()) { topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); @@ -118,7 +120,7 @@ public class InternalAggregationsTests extends ESTestCase { topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); } } - return new InternalAggregations(aggsList, topLevelPipelineAggs); + return new PipelineAggregator.PipelineTree(emptyMap(), topLevelPipelineAggs); } public void testSerialization() throws Exception { @@ -126,6 +128,14 @@ public class InternalAggregationsTests extends ESTestCase { writeToAndReadFrom(aggregations, 0); } + public void testGetTopLevelPipelineAggregators() throws Exception { + InternalAggregations orig = createTestInstance(); + PipelineAggregator.PipelineTree tree = randomPipelineTree(); + InternalAggregations withPipelines = new InternalAggregations(orig.copyResults(), () -> tree); + assertThat(withPipelines.aggregations, equalTo(orig.aggregations)); + assertThat(withPipelines.getTopLevelPipelineAggregators(), equalTo(tree.aggregators())); + } + private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { Version version = VersionUtils.randomVersion(random()); try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -135,22 +145,6 @@ public class InternalAggregationsTests extends ESTestCase { in.setVersion(version); InternalAggregations deserialized = new InternalAggregations(in); assertEquals(aggregations.aggregations, deserialized.aggregations); - if (aggregations.getTopLevelPipelineAggregators() == null) { - assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); - } else { - if (version.before(Version.V_6_7_0)) { - assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); - } else { - assertEquals(aggregations.getTopLevelPipelineAggregators().size(), - deserialized.getTopLevelPipelineAggregators().size()); - for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { - SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); - SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); - assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); - assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); - } - } - } if (iteration < 2) { //serialize this enough times to make sure that we are able to write again what we read writeToAndReadFrom(deserialized, iteration + 1); diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index d5401242e9f..730e7ae30ac 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregationsTests; -import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.internal.SearchContextId; import org.elasticsearch.search.suggest.SuggestTests; import org.elasticsearch.test.ESTestCase; @@ -44,7 +43,6 @@ import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.Base64; -import java.util.List; import static java.util.Collections.emptyList; @@ -92,16 +90,6 @@ public class QuerySearchResultTests extends ESTestCase { Aggregations aggs = querySearchResult.consumeAggs().get(); Aggregations deserializedAggs = deserialized.consumeAggs().get(); assertEquals(aggs.asList(), deserializedAggs.asList()); - List pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators(); - List deserializedPipelineAggs = - ((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators(); - assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size()); - for (int i = 0; i < pipelineAggs.size(); i++) { - SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i); - SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i); - assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths()); - assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name()); - } } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); } @@ -128,8 +116,7 @@ public class QuerySearchResultTests extends ESTestCase { assertTrue(querySearchResult.hasAggs()); InternalAggregations aggs = querySearchResult.consumeAggs().get(); assertEquals(1, aggs.asList().size()); - //top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately - assertEquals(1, aggs.getTopLevelPipelineAggregators().size()); + // We deserialize and throw away top level pipeline aggs } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index cfb2dbcc690..c5177da5d9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -441,7 +441,7 @@ public abstract class AggregatorTestCase extends ESTestCase { int r = randomIntBetween(1, toReduceSize); List toReduce = aggs.subList(0, r); InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction( - root.context().bigArrays(), getMockScriptService()); + root.context().bigArrays(), getMockScriptService(), () -> PipelineAggregator.PipelineTree.EMPTY); A reduced = (A) aggs.get(0).reduce(toReduce, context); aggs = new ArrayList<>(aggs.subList(r, toReduceSize)); aggs.add(reduced); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index e9bc24ab4f9..cbfea091a33 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -41,9 +41,9 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.ParsedAggregation; -import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.adjacency.ParsedAdjacencyMatrix; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; @@ -53,9 +53,9 @@ import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregationBui import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilter; import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilters; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid; import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoTileGrid; -import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; @@ -138,8 +138,8 @@ import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import java.io.IOException; import java.util.ArrayList; @@ -166,16 +166,23 @@ public abstract class InternalAggregationTestCase * Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid but empty. */ public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder() { + return emptyReduceContextBuilder(PipelineTree.EMPTY); + } + + /** + * Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid and nearly + * empty except that it contain {@link PipelineAggregator}s. + */ + public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder(PipelineTree pipelineTree) { return new InternalAggregation.ReduceContextBuilder() { @Override public InternalAggregation.ReduceContext forPartialReduction() { - return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null); + return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null, () -> pipelineTree); } @Override public ReduceContext forFinalReduction() { - return InternalAggregation.ReduceContext.forFinalReduction( - BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineTree.EMPTY); + return InternalAggregation.ReduceContext.forFinalReduction(BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, pipelineTree); } }; } @@ -291,7 +298,8 @@ public abstract class InternalAggregationTestCase Collections.shuffle(toReduce, random()); int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); - InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(bigArrays, mockScriptService); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction( + bigArrays, mockScriptService, () -> PipelineAggregator.PipelineTree.EMPTY); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(internalAggregations, context); int initialBucketCount = 0; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java index 27a1e23a626..fabcff29cf0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java @@ -47,6 +47,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuil import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -105,7 +106,8 @@ public class TransportRollupSearchAction extends TransportAction { - InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction( + bigArrays, scriptService, () -> PipelineAggregator.PipelineTree.EMPTY); listener.onResponse(processResponses(rollupSearchContext, msearchResponse, context)); }, listener::onFailure)); }