Only call reduce on a single InternalAggregation when needed (#62525) (#62594)

Adds a new abstract method in InternalAggregation that flags the framework if it needs to reduce on a single InternalAggregation.
This commit is contained in:
Ignacio Vera 2020-09-18 08:43:58 +02:00 committed by GitHub
parent 5b7246157f
commit 6a3d731be1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 80 additions and 3 deletions

View File

@ -254,6 +254,11 @@ public class InternalMatrixStats extends InternalAggregation implements MatrixSt
return new InternalMatrixStats(name, runningStats.docCount, runningStats, null, getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), stats, results);

View File

@ -255,9 +255,17 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
* try reusing an existing instance (typically the first in the given list) to save on redundant object
* construction.
*
* @see #mustReduceOnSingleInternalAgg()
*/
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
/**
* Signal the framework if the {@linkplain InternalAggregation#reduce(List, ReduceContext)} phase needs to be called
* when there is only one {@linkplain InternalAggregation}.
*/
protected abstract boolean mustReduceOnSingleInternalAgg();
/**
* Return true if this aggregation is mapped, and can lead a reduction. If this agg returns
* false, it should return itself if asked to lead a reduction

View File

@ -255,7 +255,12 @@ public final class InternalAggregations extends Aggregations implements Writeabl
// If all aggs are unmapped, the agg that leads the reduction will just return itself
aggregations.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
if (first.mustReduceOnSingleInternalAgg() || aggregations.size() > 1) {
reducedAggregations.add(first.reduce(aggregations, context));
} else {
// no need for reduce phase
reducedAggregations.add(first);
}
}
return ctor.apply(reducedAggregations);

View File

@ -177,6 +177,11 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
return modified ? create(newBuckets) : this;
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public void forEachBucket(Consumer<InternalAggregations> consumer) {
for (B bucket : getBuckets()) {

View File

@ -176,6 +176,11 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
return aggregations.sortValue(head, tail);
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
InternalAggregations rewritten = rewriter.apply(aggregations);

View File

@ -116,6 +116,11 @@ public class InternalGeoBounds extends InternalAggregation implements GeoBounds
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude, getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {

View File

@ -133,6 +133,11 @@ public class InternalGeoCentroid extends InternalAggregation implements GeoCentr
return new InternalGeoCentroid(name, result, totalCount, getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {

View File

@ -128,6 +128,11 @@ public abstract class InternalNumericMetricsAggregation extends InternalAggregat
throw new IllegalArgumentException("Metrics aggregations cannot have sub-aggregations (at [>" + head + "]");
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format);

View File

@ -74,7 +74,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
/*
* I *believe* that this situation can only happen in cross
* cluster search right now. Thus the message. But computers
* are hard.
* are hard.
*/
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
}
@ -134,6 +134,11 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {

View File

@ -162,6 +162,11 @@ public class InternalTopHits extends InternalAggregation implements TopHits {
new SearchHits(hits, reducedTopDocs.totalHits, maxScore), getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {

View File

@ -1007,6 +1007,11 @@ public class SearchPhaseControllerTests extends ESTestCase {
return new InternalThrowing(name, false, metadata);
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public Object getProperty(List<String> path) {
return null;

View File

@ -993,6 +993,11 @@ public abstract class AggregatorTestCase extends ESTestCase {
return new InternalAggCardinality(name, cardinality, metadata);
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return builder.array("cardinality", cardinality);

View File

@ -214,6 +214,11 @@ public class InternalStringStats extends InternalAggregation {
showDistribution, format, getMetadata());
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return false;
}
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {

View File

@ -18,7 +18,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
public class InternalInferenceAggregation extends InternalAggregation {
public class InternalInferenceAggregation extends InternalAggregation {
private final InferenceResults inferenceResult;
@ -47,6 +47,10 @@ public class InternalInferenceAggregation extends InternalAggregation {
throw new UnsupportedOperationException("Reducing an inference aggregation is not supported");
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public Object getProperty(List<String> path) {

View File

@ -40,6 +40,11 @@ public class TestSingleValueAggregation extends InternalAggregation {
throw new UnsupportedOperationException();
}
@Override
protected boolean mustReduceOnSingleInternalAgg() {
return true;
}
@Override
public Object getProperty(List<String> path) {
if (this.path.equals(path)) {