Simplify SiblingPipelineAggregator (#53144) (#53341)

This removes the `instanceof`s from `SiblingPipelineAggregator` by
adding a `rewriteBuckets` method to `InternalAggregation` that can be
called to, well, rewrite the buckets. The default implementation of
`rewriteBuckets` throws the same exception that was thrown when you
attempted to run a `SiblingPipelineAggregator` on an aggregation without
buckets. It is overridden by `InternalSingleBucketAggregation` and
`InternalMultiBucketAggregation` to correctly rewrite their buckets.
This commit is contained in:
Nik Everett 2020-03-10 11:39:10 -04:00 committed by GitHub
parent 89c0e1f566
commit 5ce6de2c1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 40 deletions

View File

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntConsumer;
/**
@ -127,6 +128,29 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
return name;
}
/**
* Rewrite the sub-aggregations in the buckets in this aggregation.
* Returns a copy of this {@linkplain InternalAggregation} with the
* rewritten buckets, or, if there aren't any modifications to
* the buckets then this method will return this aggregation. Either
* way, it doesn't modify this aggregation.
* <p>
* Implementers of this should call the {@code rewriter} once per bucket
* with its {@linkplain InternalAggregations}. The {@code rewriter}
* should return {@code null} if it doen't have any rewriting to do or
* it should return a new {@linkplain InternalAggregations} to make
* changs.
* <p>
* The default implementation throws an exception because most
* aggregations don't <strong>have</strong> buckets in them. It
* should be overridden by aggregations that contain buckets. Implementers
* should respect the description above.
*/
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
throw new IllegalStateException(
"Aggregation [" + getName() + "] must be a bucket aggregation but was [" + getWriteableName() + "]");
}
/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced

View File

@ -92,6 +92,15 @@ public final class InternalAggregations extends Aggregations implements Writeabl
}
}
/**
* Make a mutable copy of the aggregation results.
* <p>
* IMPORTANT: The copy doesn't include any pipeline aggregations, if there are any.
*/
public List<InternalAggregation> copyResults() {
return new ArrayList<>(getInternalAggregations());
}
/**
* Returns the top-level pipeline aggregators.
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation,
B extends InternalMultiBucketAggregation.InternalBucket>
@ -154,6 +155,23 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
return super.reducePipelines(create(materializedBuckets), reduceContext);
}
@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
boolean modified = false;
List<B> newBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
InternalAggregations rewritten = rewriter.apply((InternalAggregations) bucket.getAggregations());
if (rewritten == null) {
newBuckets.add(bucket);
continue;
}
modified = true;
B newBucket = createBucket(rewritten, bucket);
newBuckets.add(newBucket);
}
return modified ? create(newBuckets) : this;
}
private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
@ -192,6 +210,5 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
}
return aggregation.getProperty(path.subList(1, path.size()));
}
}
}

View File

@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
/**
* A base class for all the single bucket aggregations.
@ -169,6 +170,15 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
return aggregations.sortValue(head, tail);
}
@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
InternalAggregations rewritten = rewriter.apply(aggregations);
if (rewritten == null) {
return this;
}
return create(rewritten);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;

View File

@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.bucket.adjacency;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -84,7 +83,7 @@ public class InternalAdjacencyMatrix
}
@Override
public Aggregations getAggregations() {
public InternalAggregations getAggregations() {
return aggregations;
}

View File

@ -24,16 +24,10 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public abstract class SiblingPipelineAggregator extends PipelineAggregator {
protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
@ -47,39 +41,13 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
super(in);
}
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
if (aggregation instanceof InternalMultiBucketAggregation) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
List<Bucket> newBuckets = new ArrayList<>();
for (Bucket bucket1 : buckets) {
InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) bucket1;
InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext);
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(aggToAdd);
InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}
return multiBucketsAgg.create(newBuckets);
} else if (aggregation instanceof InternalSingleBucketAggregation) {
InternalSingleBucketAggregation singleBucketAgg = (InternalSingleBucketAggregation) aggregation;
InternalAggregation aggToAdd = doReduce(singleBucketAgg.getAggregations(), reduceContext);
List<InternalAggregation> aggs = StreamSupport.stream(singleBucketAgg.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(aggToAdd);
return singleBucketAgg.create(new InternalAggregations(aggs));
} else {
throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation ["
+ aggregation.getWriteableName() + "]");
}
return aggregation.copyWithRewritenBuckets(aggregations -> {
List<InternalAggregation> aggs = aggregations.copyResults();
aggs.add(doReduce(aggregations, reduceContext));
return new InternalAggregations(aggs);
});
}
public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);