Save a little space in agg tree (backport of #53730) (#54213)

This drop the "top level" pipeline aggregators from the aggregation
result tree which should save a little memory and a few serialization
bytes. Perhaps more imporantly, this provides a mechanism by which we
can remove *all* pipelines from the aggregation result tree. This will
save quite a bit of space when pipelines are deep in the tree.

Sadly, doing this isn't simple because of backwards compatibility. Nodes
before 7.7.0 *need* those pipelines. We provide them by setting passing
a `Supplier<PipelineTree>` into the root of the aggregation tree that we
only call if we need to serialize to a version before 7.7.0.

This solution works for cross cluster search because we always reduce
the aggregations in each remote cluster and then forward them back to
the coordinating node. Its quite possible that the coordinating node
needs the pipeline (say it is version 7.1.0) and the gateway node in the
remote cluster doesn't (version 7.7.0). In that case the data nodes
won't send the pipeline aggregations back to the gateway node.
Critically, the gateway node *will* send the pipeline aggregations back
to the coordinating node. This is all managed with that
`Supplier<PipelineTree>`, but *how* it is managed is a bit tricky.
This commit is contained in:
Nik Everett 2020-03-25 15:51:16 -04:00 committed by GitHub
parent 66861a82a1
commit 8f40f1435a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 280 additions and 187 deletions

View File

@ -15,15 +15,15 @@
refresh: true
body:
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 1}'
- '{"f1": "local_cluster", "animal": "dog", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "local_cluster", "filter_field": 0}'
- '{"f1": "local_cluster", "animal": "pig", "filter_field": 0}'
- do:
search:
@ -115,6 +115,87 @@
- match: { aggregations.cluster.buckets.0.key: "local_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 5 }
# once more, this time with a top level pipeline agg
- do:
search:
rest_total_hits_as_int: true
index: test_index,my_remote_cluster:test_index
body:
seq_no_primary_term: true
aggs:
cluster:
terms:
field: f1.keyword
aggs:
s:
sum:
field: filter_field
average_sum:
avg_bucket:
buckets_path: cluster.s
- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.0.s.value: 2 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.1.s.value: 2 }
- match: { aggregations.average_sum.value: 2 }
# and now a non-top-level pipeline agg!
- do:
search:
rest_total_hits_as_int: true
index: test_index,my_remote_cluster:test_index
body:
seq_no_primary_term: true
aggs:
cluster:
terms:
field: f1.keyword
aggs:
animal:
terms:
field: animal.keyword
aggs:
s:
sum:
field: filter_field
average_sum:
avg_bucket:
buckets_path: animal.s
- match: { num_reduce_phases: 3 }
- match: {_clusters.total: 2}
- match: {_clusters.successful: 2}
- match: {_clusters.skipped: 0}
- match: { _shards.total: 5 }
- match: { hits.total: 11 }
- length: { aggregations.cluster.buckets: 2 }
- match: { aggregations.cluster.buckets.0.key: "remote_cluster" }
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.key: "chicken" }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.doc_count: 4 }
- match: { aggregations.cluster.buckets.0.animal.buckets.0.s.value: 1 }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.key: "pig" }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.doc_count: 2 }
- match: { aggregations.cluster.buckets.0.animal.buckets.1.s.value: 1 }
- match: { aggregations.cluster.buckets.0.average_sum.value: 1 }
- match: { aggregations.cluster.buckets.1.key: "local_cluster" }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.key: "dog" }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.doc_count: 4 }
- match: { aggregations.cluster.buckets.1.animal.buckets.0.s.value: 2 }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.key: "pig" }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.doc_count: 1 }
- match: { aggregations.cluster.buckets.1.animal.buckets.1.s.value: 0 }
- match: { aggregations.cluster.buckets.1.average_sum.value: 1 }
---
"Add transient remote cluster based on the preset cluster":
- do:

View File

@ -91,17 +91,17 @@
refresh: true
body:
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "pig", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"f1": "remote_cluster", "animal": "pig", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 1}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 1}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- '{"index": {"_index": "test_index"}}'
- '{"f1": "remote_cluster", "filter_field": 0}'
- '{"f1": "remote_cluster", "animal": "chicken", "filter_field": 0}'
- do:
search:

View File

@ -1181,7 +1181,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService);
return InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService,
() -> requestToPipelineTree(request));
}
@Override

View File

@ -133,17 +133,16 @@ public class AggregationPhase implements SearchPhase {
}
}
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
if (pipelineAggregator instanceof SiblingPipelineAggregator) {
siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
} else {
if (false == pipelineAggregator instanceof SiblingPipelineAggregator) {
// TODO move this to request validation after #53669
throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name()
+ "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are "
+ "allowed at the top level");
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
context.queryResult().aggregations(new InternalAggregations(aggregations,
context.request().source().aggregations()::buildPipelineTree));
// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -37,7 +38,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
/**
@ -62,12 +65,19 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
private final Supplier<PipelineTree> pipelineTreeForBwcSerialization;
/**
* Build a {@linkplain ReduceContext} to perform a partial reduction.
*/
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null);
public static ReduceContext forPartialReduction(BigArrays bigArrays, ScriptService scriptService,
Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization);
}
/**
@ -77,15 +87,16 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
public static ReduceContext forFinalReduction(BigArrays bigArrays, ScriptService scriptService,
IntConsumer multiBucketConsumer, PipelineTree pipelineTreeRoot) {
return new ReduceContext(bigArrays, scriptService, multiBucketConsumer,
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"));
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"), () -> pipelineTreeRoot);
}
private ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot) {
PipelineTree pipelineTreeRoot, Supplier<PipelineTree> pipelineTreeForBwcSerialization) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
}
/**
@ -112,6 +123,15 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
return pipelineTreeRoot;
}
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
public Supplier<PipelineTree> pipelineTreeForBwcSerialization() {
return pipelineTreeForBwcSerialization;
}
/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
@ -129,9 +149,9 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
private final List<PipelineAggregator> pipelineAggregators;
/**
* Constructs an get with a given name.
* Constructs an aggregation result with a given name.
*
* @param name The name of the get.
* @param name The name of the aggregation.
*/
protected InternalAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
this.name = name;
@ -145,14 +165,20 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
if (in.getVersion().before(Version.V_7_8_0)) {
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
} else {
pipelineAggregators = emptyList();
}
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
if (out.getVersion().before(Version.V_7_8_0)) {
out.writeNamedWriteableList(pipelineAggregators);
}
doWriteTo(out);
}

View File

@ -35,9 +35,13 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
/**
* An internal implementation of {@link Aggregations}.
*/
@ -55,40 +59,56 @@ public final class InternalAggregations extends Aggregations implements Writeabl
}
};
private final List<SiblingPipelineAggregator> topLevelPipelineAggregators;
/**
* The way to build a tree of pipeline aggregators. Used only for
* serialization backwards compatibility.
*/
private final Supplier<PipelineAggregator.PipelineTree> pipelineTreeForBwcSerialization;
/**
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
this.topLevelPipelineAggregators = Collections.emptyList();
this.pipelineTreeForBwcSerialization = null;
}
/**
* Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s
* Constructs a node in the aggregation tree.
* @param pipelineTreeSource must be null inside the tree or after final reduction. Should reference the
* search request otherwise so we can properly serialize the response to
* versions of Elasticsearch that require the pipelines to be serialized.
*/
public InternalAggregations(List<InternalAggregation> aggregations, List<SiblingPipelineAggregator> topLevelPipelineAggregators) {
public InternalAggregations(List<InternalAggregation> aggregations, Supplier<PipelineAggregator.PipelineTree> pipelineTreeSource) {
super(aggregations);
this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators);
this.pipelineTreeForBwcSerialization = pipelineTreeSource;
}
public InternalAggregations(StreamInput in) throws IOException {
super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)));
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
this.topLevelPipelineAggregators = in.readList(
stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class));
} else {
this.topLevelPipelineAggregators = Collections.emptyList();
if (in.getVersion().before(Version.V_7_8_0) && in.getVersion().onOrAfter(Version.V_6_7_0)) {
in.readNamedWriteableList(PipelineAggregator.class);
}
/*
* Setting the pipeline tree source to null is here is correct but
* only because we don't immediately pass the InternalAggregations
* off to another node. Instead, we always reduce together with
* many aggregations and that always adds the tree read from the
* current request.
*/
pipelineTreeForBwcSerialization = null;
}
@Override
@SuppressWarnings("unchecked")
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeNamedWriteableList(topLevelPipelineAggregators);
if (out.getVersion().before(Version.V_7_8_0) && out.getVersion().onOrAfter(Version.V_6_7_0)) {
if (pipelineTreeForBwcSerialization == null) {
out.writeNamedWriteableList(emptyList());
} else {
out.writeNamedWriteableList(pipelineTreeForBwcSerialization.get().aggregators());
}
}
}
@ -102,12 +122,17 @@ public final class InternalAggregations extends Aggregations implements Writeabl
}
/**
* Returns the top-level pipeline aggregators.
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
* become part of the list of {@link InternalAggregation}s.
* Get the top level pipeline aggregators.
* @deprecated these only exist for BWC serialization
*/
@Deprecated
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
if (pipelineTreeForBwcSerialization == null) {
return emptyList();
}
return pipelineTreeForBwcSerialization.get().aggregators().stream()
.map(p -> (SiblingPipelineAggregator) p)
.collect(toList());
}
@SuppressWarnings("unchecked")
@ -138,7 +163,8 @@ public final class InternalAggregations extends Aggregations implements Writeabl
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
InternalAggregations reduced = reduce(aggregationsList, context,
reducedAggregations -> new InternalAggregations(reducedAggregations, context.pipelineTreeForBwcSerialization()));
if (reduced == null) {
return null;
}
@ -164,12 +190,16 @@ public final class InternalAggregations extends Aggregations implements Writeabl
* {@link InternalAggregations} object found in the list.
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
* @param ctor used to build the {@link InternalAggregations}. The top level reduce specifies a constructor
* that adds pipeline aggregation information that is used to send pipeline aggregations to
* older versions of Elasticsearch that require the pipeline aggregations to be returned
* as part of the aggregation tree
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context,
Function<List<InternalAggregation>, InternalAggregations> ctor) {
if (aggregationsList.isEmpty()) {
return null;
}
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
// first we collect all aggregations of the same type and list them together
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();
@ -192,6 +222,14 @@ public final class InternalAggregations extends Aggregations implements Writeabl
reducedAggregations.add(first.reduce(aggregations, context));
}
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
return ctor.apply(reducedAggregations);
}
/**
* Version of {@link #reduce(List, ReduceContext, Function)} for nodes inside the aggregation tree.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
return reduce(aggregationsList, context, InternalAggregations::new);
}
}

View File

@ -20,12 +20,10 @@
package org.elasticsearch.search.query;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TotalHits;
@ -40,7 +38,6 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
@ -321,28 +318,13 @@ public final class QuerySearchResult extends SearchPhaseResult {
}
setTopDocs(readTopDocs(in));
if (in.getVersion().before(Version.V_7_7_0)) {
InternalAggregations readAggs = null;
if (hasAggs = in.readBoolean()) {
readAggs = new InternalAggregations(in);
aggregations = DelayableWriteable.referencing(new InternalAggregations(in));
}
if (in.getVersion().before(Version.V_7_2_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(toList());
if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = readAggs.copyResults();
/*
* Earlier versions serialize sibling pipeline aggs
* separately as they used to be set to QuerySearchResult
* directly, while later versions include them in
* InternalAggregations. Note that despite serializing
* sibling pipeline aggs as part of nternalAggregations is
* supported since 6.7.0, the shards set sibling pipeline
* aggs to InternalAggregations only from 7.1.
*/
readAggs = new InternalAggregations(internalAggs, pipelineAggregators);
// The list of PipelineAggregators is sent by old versions. We don't need it anyway.
in.readNamedWriteableList(PipelineAggregator.class);
}
}
aggregations = DelayableWriteable.referencing(readAggs);
} else {
if (hasAggs = in.readBoolean()) {
aggregations = DelayableWriteable.delayed(InternalAggregations::new, in);

View File

@ -97,7 +97,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
@Override
public ReduceContext forPartialReduction() {
reductions.add(false);
return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null);
return InternalAggregation.ReduceContext.forPartialReduction(
BigArrays.NON_RECYCLING_INSTANCE, null, () -> PipelineTree.EMPTY);
}
public ReduceContext forFinalReduction() {

View File

@ -30,14 +30,11 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContextId;
@ -45,6 +42,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.transport.Transport;
import java.util.Collections;
@ -125,17 +123,7 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
searchRequest.source().trackTotalHitsUpTo(2);
}
searchRequest.allowPartialSearchResults(false);
SearchPhaseController controller = new SearchPhaseController((b) -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null);
}
public InternalAggregation.ReduceContext forFinalReduction() {
return InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineAggregator.PipelineTree.EMPTY);
};
});
SearchPhaseController controller = new SearchPhaseController(r -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger,
searchTransportService, (clusterAlias, node) -> lookup.get(node),

View File

@ -19,6 +19,28 @@
package org.elasticsearch.action.search;
import static org.elasticsearch.test.InternalAggregationTestCase.emptyReduceContextBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
@ -57,7 +79,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.AliasFilter;
@ -80,29 +101,6 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
public class TransportSearchActionTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@ -395,7 +393,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -440,7 +438,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -466,7 +464,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -513,7 +511,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(r -> fail("no response expected"), failure::set), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -542,7 +540,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -582,7 +580,7 @@ public class TransportSearchActionTests extends ESTestCase {
LatchedActionListener<SearchResponse> listener = new LatchedActionListener<>(
ActionListener.wrap(response::set, e -> fail("no failures expected")), latch);
TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider,
aggReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
emptyReduceContextBuilder(), remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)));
if (localIndices == null) {
assertNull(setOnce.get());
} else {
@ -763,7 +761,8 @@ public class TransportSearchActionTests extends ESTestCase {
assertEquals(-1, source.size());
assertEquals(-1, source.from());
assertNull(source.trackTotalHitsUpTo());
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, aggReduceContextBuilder());
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(
source, timeProvider, emptyReduceContextBuilder());
assertEquals(0, merger.from);
assertEquals(10, merger.size);
assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo);
@ -772,7 +771,7 @@ public class TransportSearchActionTests extends ESTestCase {
assertNull(source.trackTotalHitsUpTo());
}
{
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, aggReduceContextBuilder());
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, emptyReduceContextBuilder());
assertEquals(0, merger.from);
assertEquals(10, merger.size);
assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo);
@ -785,7 +784,8 @@ public class TransportSearchActionTests extends ESTestCase {
source.size(originalSize);
int trackTotalHitsUpTo = randomIntBetween(0, Integer.MAX_VALUE);
source.trackTotalHitsUpTo(trackTotalHitsUpTo);
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, aggReduceContextBuilder());
SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(
source, timeProvider, emptyReduceContextBuilder());
assertEquals(0, source.from());
assertEquals(originalFrom + originalSize, source.size());
assertEquals(trackTotalHitsUpTo, (int)source.trackTotalHitsUpTo());
@ -937,18 +937,4 @@ public class TransportSearchActionTests extends ESTestCase {
indices, randomIntBetween(127, 10000)));
}
}
private InternalAggregation.ReduceContextBuilder aggReduceContextBuilder() {
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null);
};
@Override
public InternalAggregation.ReduceContext forFinalReduction() {
return InternalAggregation.ReduceContext.forFinalReduction(null, null, b -> {}, new PipelineTree(emptyMap(), emptyList()));
}
};
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests;
@ -33,7 +32,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests;
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests;
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
@ -45,9 +44,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class InternalAggregationsTests extends ESTestCase {
@ -64,13 +63,8 @@ public class InternalAggregationsTests extends ESTestCase {
public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
topLevelPipelineAggs));
InternalAggregation.ReduceContext reduceContext = InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction();
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, reduceContext);
List<InternalAggregations> aggs = singletonList(new InternalAggregations(Collections.singletonList(terms)));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(1, reducedAggs.aggregations.size());
}
@ -79,16 +73,20 @@ public class InternalAggregationsTests extends ESTestCase {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create();
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, new PipelineTree(emptyMap(), singletonList(siblingPipelineAggregator)));
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), emptyList());
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs), reduceContext);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs),
maxBucketReduceContext().forFinalReduction());
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(2, reducedAggs.aggregations.size());
}
private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() {
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
PipelineAggregator.PipelineTree tree =
new PipelineAggregator.PipelineTree(emptyMap(), singletonList(maxBucketPipelineAggregationBuilder.create()));
return InternalAggregationTestCase.emptyReduceContextBuilder(tree);
}
public static InternalAggregations createTestInstance() throws Exception {
List<InternalAggregation> aggsList = new ArrayList<>();
if (randomBoolean()) {
@ -106,7 +104,11 @@ public class InternalAggregationsTests extends ESTestCase {
InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests();
aggsList.add(simpleValueTests.createTestInstance());
}
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
return new InternalAggregations(aggsList);
}
private static PipelineAggregator.PipelineTree randomPipelineTree() {
List<PipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
if (randomBoolean()) {
if (randomBoolean()) {
topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create());
@ -118,7 +120,7 @@ public class InternalAggregationsTests extends ESTestCase {
topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
}
}
return new InternalAggregations(aggsList, topLevelPipelineAggs);
return new PipelineAggregator.PipelineTree(emptyMap(), topLevelPipelineAggs);
}
public void testSerialization() throws Exception {
@ -126,6 +128,14 @@ public class InternalAggregationsTests extends ESTestCase {
writeToAndReadFrom(aggregations, 0);
}
public void testGetTopLevelPipelineAggregators() throws Exception {
InternalAggregations orig = createTestInstance();
PipelineAggregator.PipelineTree tree = randomPipelineTree();
InternalAggregations withPipelines = new InternalAggregations(orig.copyResults(), () -> tree);
assertThat(withPipelines.aggregations, equalTo(orig.aggregations));
assertThat(withPipelines.getTopLevelPipelineAggregators(), equalTo(tree.aggregators()));
}
private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException {
Version version = VersionUtils.randomVersion(random());
try (BytesStreamOutput out = new BytesStreamOutput()) {
@ -135,22 +145,6 @@ public class InternalAggregationsTests extends ESTestCase {
in.setVersion(version);
InternalAggregations deserialized = new InternalAggregations(in);
assertEquals(aggregations.aggregations, deserialized.aggregations);
if (aggregations.getTopLevelPipelineAggregators() == null) {
assertEquals(0, deserialized.getTopLevelPipelineAggregators().size());
} else {
if (version.before(Version.V_6_7_0)) {
assertEquals(0, deserialized.getTopLevelPipelineAggregators().size());
} else {
assertEquals(aggregations.getTopLevelPipelineAggregators().size(),
deserialized.getTopLevelPipelineAggregators().size());
for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) {
SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i);
SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i);
assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths());
assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name());
}
}
}
if (iteration < 2) {
//serialize this enough times to make sure that we are able to write again what we read
writeToAndReadFrom(deserialized, iteration + 1);

View File

@ -36,7 +36,6 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
@ -44,7 +43,6 @@ import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import static java.util.Collections.emptyList;
@ -92,16 +90,6 @@ public class QuerySearchResultTests extends ESTestCase {
Aggregations aggs = querySearchResult.consumeAggs().get();
Aggregations deserializedAggs = deserialized.consumeAggs().get();
assertEquals(aggs.asList(), deserializedAggs.asList());
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
List<SiblingPipelineAggregator> deserializedPipelineAggs =
((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
for (int i = 0; i < pipelineAggs.size(); i++) {
SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
}
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
}
@ -128,8 +116,7 @@ public class QuerySearchResultTests extends ESTestCase {
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = querySearchResult.consumeAggs().get();
assertEquals(1, aggs.asList().size());
//top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately
assertEquals(1, aggs.getTopLevelPipelineAggregators().size());
// We deserialize and throw away top level pipeline aggs
}
}

View File

@ -441,7 +441,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> toReduce = aggs.subList(0, r);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(
root.context().bigArrays(), getMockScriptService());
root.context().bigArrays(), getMockScriptService(), () -> PipelineAggregator.PipelineTree.EMPTY);
A reduced = (A) aggs.get(0).reduce(toReduce, context);
aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
aggs.add(reduced);

View File

@ -41,9 +41,9 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
@ -53,9 +53,9 @@ import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregationBui
import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilter;
import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilters;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoTileGrid;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
@ -138,8 +138,8 @@ import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import java.io.IOException;
import java.util.ArrayList;
@ -166,16 +166,23 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
* Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid but empty.
*/
public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder() {
return emptyReduceContextBuilder(PipelineTree.EMPTY);
}
/**
* Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid and nearly
* empty <strong>except</strong> that it contain {@link PipelineAggregator}s.
*/
public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder(PipelineTree pipelineTree) {
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null);
return InternalAggregation.ReduceContext.forPartialReduction(BigArrays.NON_RECYCLING_INSTANCE, null, () -> pipelineTree);
}
@Override
public ReduceContext forFinalReduction() {
return InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineTree.EMPTY);
return InternalAggregation.ReduceContext.forFinalReduction(BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, pipelineTree);
}
};
}
@ -291,7 +298,8 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
Collections.shuffle(toReduce, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> internalAggregations = toReduce.subList(0, r);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(bigArrays, mockScriptService);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(
bigArrays, mockScriptService, () -> PipelineAggregator.PipelineTree.EMPTY);
@SuppressWarnings("unchecked")
T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
int initialBucketCount = 0;

View File

@ -47,6 +47,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuil
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -105,7 +106,8 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
MultiSearchRequest msearch = createMSearchRequest(request, registry, rollupSearchContext);
client.multiSearch(msearch, ActionListener.wrap(msearchResponse -> {
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(bigArrays, scriptService);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(
bigArrays, scriptService, () -> PipelineAggregator.PipelineTree.EMPTY);
listener.onResponse(processResponses(rollupSearchContext, msearchResponse, context));
}, listener::onFailure));
}