Move getProperty method out of MultiBucketsAggregation.Bucket interface (#23988)
The getProperty method is an internal method needed to run pipeline aggregations and retrieve info by path from the aggs tree. It is not needed in the MultiBucketsAggregation.Bucket interface, which is returned to users running aggregations from the transport client. The method is moved to the InternalMultiBucketAggregation class as that's where it belongs.
This commit is contained in:
parent
af49c46b76
commit
2c545c064d
|
@ -62,6 +62,9 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
|
|||
*/
|
||||
public abstract B createBucket(InternalAggregations aggregations, B prototype);
|
||||
|
||||
@Override
|
||||
public abstract List<? extends InternalBucket> getBuckets();
|
||||
|
||||
@Override
|
||||
public Object getProperty(List<String> path) {
|
||||
if (path.isEmpty()) {
|
||||
|
@ -69,7 +72,7 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
|
|||
} else if (path.get(0).equals("_bucket_count")) {
|
||||
return getBuckets().size();
|
||||
} else {
|
||||
List<? extends Bucket> buckets = getBuckets();
|
||||
List<? extends InternalBucket> buckets = getBuckets();
|
||||
Object[] propertyArray = new Object[buckets.size()];
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
propertyArray[i] = buckets.get(i).getProperty(getName(), path);
|
||||
|
@ -79,7 +82,7 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
|
|||
}
|
||||
|
||||
public abstract static class InternalBucket implements Bucket {
|
||||
@Override
|
||||
|
||||
public Object getProperty(String containingAggName, List<String> path) {
|
||||
if (path.isEmpty()) {
|
||||
return this;
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.util.Comparators;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
|
@ -29,7 +27,6 @@ import org.elasticsearch.search.aggregations.Aggregations;
|
|||
import org.elasticsearch.search.aggregations.HasAggregations;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationPath;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -40,7 +37,7 @@ public interface MultiBucketsAggregation extends Aggregation {
|
|||
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
|
||||
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
|
||||
*/
|
||||
public interface Bucket extends HasAggregations, ToXContent, Writeable {
|
||||
interface Bucket extends HasAggregations, ToXContent, Writeable {
|
||||
/**
|
||||
* @return The key associated with the bucket
|
||||
*/
|
||||
|
@ -62,8 +59,6 @@ public interface MultiBucketsAggregation extends Aggregation {
|
|||
@Override
|
||||
Aggregations getAggregations();
|
||||
|
||||
Object getProperty(String containingAggName, List<String> path);
|
||||
|
||||
class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> {
|
||||
|
||||
private final AggregationPath path;
|
||||
|
|
|
@ -38,6 +38,5 @@ public interface GeoHashGrid extends MultiBucketsAggregation {
|
|||
* @return The buckets of this aggregation (each bucket representing a geohash grid cell)
|
||||
*/
|
||||
@Override
|
||||
List<Bucket> getBuckets();
|
||||
|
||||
List<? extends Bucket> getBuckets();
|
||||
}
|
||||
|
|
|
@ -185,7 +185,7 @@ public class InternalGeoHashGrid extends InternalMultiBucketAggregation<Internal
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<GeoHashGrid.Bucket> getBuckets() {
|
||||
public List<InternalGeoHashGrid.Bucket> getBuckets() {
|
||||
return unmodifiableList(buckets);
|
||||
}
|
||||
|
||||
|
|
|
@ -48,8 +48,7 @@ public interface Histogram extends MultiBucketsAggregation {
|
|||
* @return The buckets of this histogram (each bucket representing an interval in the histogram)
|
||||
*/
|
||||
@Override
|
||||
List<Bucket> getBuckets();
|
||||
|
||||
List<? extends Bucket> getBuckets();
|
||||
|
||||
/**
|
||||
* A strategy defining the order in which the buckets in this histogram are ordered.
|
||||
|
|
|
@ -265,7 +265,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Histogram.Bucket> getBuckets() {
|
||||
public List<InternalDateHistogram.Bucket> getBuckets() {
|
||||
return Collections.unmodifiableList(buckets);
|
||||
}
|
||||
|
||||
|
|
|
@ -255,7 +255,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Histogram.Bucket> getBuckets() {
|
||||
public List<InternalHistogram.Bucket> getBuckets() {
|
||||
return Collections.unmodifiableList(buckets);
|
||||
}
|
||||
|
||||
|
|
|
@ -225,7 +225,7 @@ public final class InternalBinaryRange
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Range.Bucket> getBuckets() {
|
||||
public List<InternalBinaryRange.Bucket> getBuckets() {
|
||||
return unmodifiableList(buckets);
|
||||
}
|
||||
|
||||
|
|
|
@ -147,13 +147,13 @@ public class BucketHelpers {
|
|||
* <code>aggPath</code>
|
||||
*/
|
||||
public static Double resolveBucketValue(MultiBucketsAggregation agg,
|
||||
InternalMultiBucketAggregation.Bucket bucket, String aggPath, GapPolicy gapPolicy) {
|
||||
InternalMultiBucketAggregation.InternalBucket bucket, String aggPath, GapPolicy gapPolicy) {
|
||||
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
|
||||
return resolveBucketValue(agg, bucket, aggPathsList, gapPolicy);
|
||||
}
|
||||
|
||||
public static Double resolveBucketValue(MultiBucketsAggregation agg,
|
||||
InternalMultiBucketAggregation.Bucket bucket, List<String> aggPathAsList, GapPolicy gapPolicy) {
|
||||
InternalMultiBucketAggregation.InternalBucket bucket, List<String> aggPathAsList, GapPolicy gapPolicy) {
|
||||
try {
|
||||
Object propertyValue = bucket.getProperty(agg.getName(), aggPathAsList);
|
||||
if (propertyValue == null) {
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.Aggregations;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
@ -82,9 +81,8 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
|
|||
if (aggregation.getName().equals(bucketsPath.get(0))) {
|
||||
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
|
||||
InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) aggregation;
|
||||
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
Bucket bucket = buckets.get(i);
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = multiBucketsAgg.getBuckets();
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
|
||||
if (bucketValue != null && !Double.isNaN(bucketValue)) {
|
||||
collectBucketValue(bucket.getKeyAsString(), bucketValue);
|
||||
|
|
|
@ -31,14 +31,12 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -89,12 +87,13 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg = (InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends Bucket> buckets = originalAgg.getBuckets();
|
||||
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
|
||||
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
|
||||
|
||||
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS);
|
||||
List newBuckets = new ArrayList<>();
|
||||
for (Bucket bucket : buckets) {
|
||||
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Map<String, Object> vars = new HashMap<>();
|
||||
if (script.getParams() != null) {
|
||||
vars.putAll(script.getParams());
|
||||
|
@ -122,13 +121,12 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
|
|||
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name()
|
||||
+ "] must return a Number");
|
||||
}
|
||||
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map((p) -> {
|
||||
return (InternalAggregation) p;
|
||||
}).collect(Collectors.toList());
|
||||
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
|
||||
(p) -> (InternalAggregation) p).collect(Collectors.toList());
|
||||
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter,
|
||||
new ArrayList<>(), metaData()));
|
||||
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
|
||||
(InternalMultiBucketAggregation.InternalBucket) bucket);
|
||||
bucket);
|
||||
newBuckets.add(newBucket);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,13 +29,11 @@ import org.elasticsearch.script.ScriptContext;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -84,11 +82,11 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
|
|||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
|
||||
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends Bucket> buckets = originalAgg.getBuckets();
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
|
||||
|
||||
CompiledScript compiledScript = reduceContext.scriptService().compile(script, ScriptContext.Standard.AGGS);
|
||||
List newBuckets = new ArrayList<>();
|
||||
for (Bucket bucket : buckets) {
|
||||
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>();
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Map<String, Object> vars = new HashMap<>();
|
||||
if (script.getParams() != null) {
|
||||
vars.putAll(script.getParams());
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.search.DocValueFormat;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
|
@ -70,13 +70,14 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
MultiBucketsAggregation histo = (MultiBucketsAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
|
||||
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
|
||||
InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
|
||||
HistogramFactory factory = (HistogramFactory) histo;
|
||||
|
||||
List<Bucket> newBuckets = new ArrayList<>();
|
||||
double sum = 0;
|
||||
for (Bucket bucket : buckets) {
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.INSERT_ZEROS);
|
||||
sum += thisBucketValue;
|
||||
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map((p) -> {
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.search.DocValueFormat;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
|
@ -77,14 +77,16 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
MultiBucketsAggregation histo = (MultiBucketsAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
|
||||
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
|
||||
InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
|
||||
HistogramFactory factory = (HistogramFactory) histo;
|
||||
|
||||
List<Bucket> newBuckets = new ArrayList<>();
|
||||
Number lastBucketKey = null;
|
||||
Double lastBucketValue = null;
|
||||
for (Bucket bucket : buckets) {
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Number thisBucketKey = factory.getKey(bucket);
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
||||
if (lastBucketValue != null && thisBucketValue != null) {
|
||||
|
@ -107,5 +109,4 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
|
|||
}
|
||||
return factory.createAggregation(newBuckets);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.search.DocValueFormat;
|
|||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
|
||||
|
@ -93,8 +94,10 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
MultiBucketsAggregation histo = (MultiBucketsAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
|
||||
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
|
||||
InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
|
||||
HistogramFactory factory = (HistogramFactory) histo;
|
||||
|
||||
List<Bucket> newBuckets = new ArrayList<>();
|
||||
|
@ -110,7 +113,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
|||
model = minimize(buckets, histo, model);
|
||||
}
|
||||
|
||||
for (Bucket bucket : buckets) {
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
||||
|
||||
// Default is to reuse existing bucket. Simplifies the rest of the logic,
|
||||
|
@ -180,13 +183,14 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
|||
return factory.createAggregation(newBuckets);
|
||||
}
|
||||
|
||||
private MovAvgModel minimize(List<? extends Bucket> buckets, MultiBucketsAggregation histo, MovAvgModel model) {
|
||||
private MovAvgModel minimize(List<? extends InternalMultiBucketAggregation.InternalBucket> buckets,
|
||||
MultiBucketsAggregation histo, MovAvgModel model) {
|
||||
|
||||
int counter = 0;
|
||||
EvictingQueue<Double> values = new EvictingQueue<>(this.window);
|
||||
|
||||
double[] test = new double[window];
|
||||
ListIterator<? extends Bucket> iter = buckets.listIterator(buckets.size());
|
||||
ListIterator<? extends InternalMultiBucketAggregation.InternalBucket> iter = buckets.listIterator(buckets.size());
|
||||
|
||||
// We have to walk the iterator backwards because we don't know if/how many buckets are empty.
|
||||
while (iter.hasPrevious() && counter < window) {
|
||||
|
|
|
@ -26,10 +26,10 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.search.DocValueFormat;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
|
@ -80,15 +80,17 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
|
||||
MultiBucketsAggregation histo = (MultiBucketsAggregation) aggregation;
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
|
||||
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
|
||||
InternalMultiBucketAggregation.InternalBucket>) aggregation;
|
||||
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
|
||||
HistogramFactory factory = (HistogramFactory) histo;
|
||||
|
||||
List<Bucket> newBuckets = new ArrayList<>();
|
||||
EvictingQueue<Double> lagWindow = new EvictingQueue<>(lag);
|
||||
int counter = 0;
|
||||
|
||||
for (Bucket bucket : buckets) {
|
||||
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
|
||||
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
|
||||
Bucket newBucket = bucket;
|
||||
|
||||
|
@ -111,17 +113,14 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
|
|||
if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) {
|
||||
double diff = thisBucketValue - lagValue;
|
||||
|
||||
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map((p) -> {
|
||||
return (InternalAggregation) p;
|
||||
}).collect(Collectors.toList());
|
||||
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData()));
|
||||
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
|
||||
(p) -> (InternalAggregation) p).collect(Collectors.toList());
|
||||
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<>(), metaData()));
|
||||
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
|
||||
}
|
||||
|
||||
|
||||
newBuckets.add(newBucket);
|
||||
lagWindow.add(thisBucketValue);
|
||||
|
||||
}
|
||||
return factory.createAggregation(newBuckets);
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ public class DateHistogramOffsetIT extends ESIntegTestCase {
|
|||
assertThat(response.getHits().getTotalHits(), equalTo(5L));
|
||||
|
||||
Histogram histo = response.getAggregations().get("date_histo");
|
||||
List<Histogram.Bucket> buckets = histo.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(2));
|
||||
|
||||
checkBucketFor(buckets.get(0), new DateTime(2014, 3, 10, 2, 0, DateTimeZone.UTC), 2L);
|
||||
|
|
|
@ -157,7 +157,7 @@ public class GeoHashGridIT extends ESIntegTestCase {
|
|||
assertSearchResponse(response);
|
||||
|
||||
GeoHashGrid geoGrid = response.getAggregations().get("geohashgrid");
|
||||
List<Bucket> buckets = geoGrid.getBuckets();
|
||||
List<? extends Bucket> buckets = geoGrid.getBuckets();
|
||||
Object[] propertiesKeys = (Object[]) ((InternalAggregation)geoGrid).getProperty("_key");
|
||||
Object[] propertiesDocCounts = (Object[]) ((InternalAggregation)geoGrid).getProperty("_count");
|
||||
for (int i = 0; i < buckets.size(); i++) {
|
||||
|
|
|
@ -990,7 +990,7 @@ public class HistogramIT extends ESIntegTestCase {
|
|||
assertSearchResponse(r);
|
||||
|
||||
Histogram histogram = r.getAggregations().get("histo");
|
||||
List<Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(2, buckets.size());
|
||||
assertEquals(-0.65, (double) buckets.get(0).getKey(), 0.01d);
|
||||
assertEquals(1, buckets.get(0).getDocCount());
|
||||
|
|
|
@ -105,7 +105,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
testBothCases(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.YEAR).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(3, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -128,7 +128,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"),
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.MONTH).field(DATE_FIELD),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(3, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -159,7 +159,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
),
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.DAY).field(DATE_FIELD).minDocCount(1L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(4, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -197,7 +197,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
),
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.HOUR).field(DATE_FIELD).minDocCount(1L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(6, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -238,7 +238,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
),
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.MINUTE).field(DATE_FIELD).minDocCount(1L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(3, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -268,7 +268,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
),
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.SECOND).field(DATE_FIELD).minDocCount(1L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(3, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -300,7 +300,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(DATE_FIELD).minDocCount(0L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(4, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
@ -325,7 +325,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
|
|||
testSearchAndReduceCase(query, timestamps,
|
||||
aggregation -> aggregation.dateHistogramInterval(DateHistogramInterval.seconds(5)).field(DATE_FIELD).minDocCount(3L),
|
||||
histogram -> {
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
assertEquals(1, buckets.size());
|
||||
|
||||
Histogram.Bucket bucket = buckets.get(0);
|
||||
|
|
|
@ -157,9 +157,8 @@ public class GeoCentroidIT extends AbstractGeoTestCase {
|
|||
GeoHashGrid grid = response.getAggregations().get("geoGrid");
|
||||
assertThat(grid, notNullValue());
|
||||
assertThat(grid.getName(), equalTo("geoGrid"));
|
||||
List<GeoHashGrid.Bucket> buckets = grid.getBuckets();
|
||||
for (int i=0; i < buckets.size(); ++i) {
|
||||
GeoHashGrid.Bucket cell = buckets.get(i);
|
||||
List<? extends GeoHashGrid.Bucket> buckets = grid.getBuckets();
|
||||
for (GeoHashGrid.Bucket cell : buckets) {
|
||||
String geohash = cell.getKeyAsString();
|
||||
GeoPoint expectedCentroid = expectedCentroidsForGeoHash.get(geohash);
|
||||
GeoCentroid centroidAgg = cell.getAggregations().get(aggName);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||
|
@ -381,7 +382,8 @@ public class DateDerivativeIT extends ESIntegTestCase {
|
|||
deriv = bucket.getAggregations().get("deriv");
|
||||
assertThat(deriv, notNullValue());
|
||||
assertThat(deriv.value(), equalTo(4.0));
|
||||
assertThat((double) bucket.getProperty("histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()), equalTo(4.0));
|
||||
assertThat(((InternalMultiBucketAggregation.InternalBucket)bucket).getProperty(
|
||||
"histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()), equalTo(4.0));
|
||||
assertThat((DateTime) propertiesKeys[1], equalTo(key));
|
||||
assertThat((long) propertiesDocCounts[1], equalTo(2L));
|
||||
assertThat((double) propertiesCounts[1], equalTo(5.0));
|
||||
|
@ -398,7 +400,8 @@ public class DateDerivativeIT extends ESIntegTestCase {
|
|||
deriv = bucket.getAggregations().get("deriv");
|
||||
assertThat(deriv, notNullValue());
|
||||
assertThat(deriv.value(), equalTo(10.0));
|
||||
assertThat((double) bucket.getProperty("histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()), equalTo(10.0));
|
||||
assertThat(((InternalMultiBucketAggregation.InternalBucket)bucket).getProperty(
|
||||
"histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()), equalTo(10.0));
|
||||
assertThat((DateTime) propertiesKeys[2], equalTo(key));
|
||||
assertThat((long) propertiesDocCounts[2], equalTo(3L));
|
||||
assertThat((double) propertiesCounts[2], equalTo(15.0));
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
|
||||
|
@ -279,7 +280,8 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
assertThat(sumDeriv, notNullValue());
|
||||
long sumDerivValue = expectedSum - expectedSumPreviousBucket;
|
||||
assertThat(sumDeriv.value(), equalTo((double) sumDerivValue));
|
||||
assertThat((double) bucket.getProperty("histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()),
|
||||
assertThat(((InternalMultiBucketAggregation.InternalBucket)bucket).getProperty("histo",
|
||||
AggregationPath.parse("deriv.value").getPathElementsAsStringList()),
|
||||
equalTo((double) sumDerivValue));
|
||||
} else {
|
||||
assertThat(sumDeriv, nullValue());
|
||||
|
@ -324,7 +326,8 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
assertThat(sumDeriv, notNullValue());
|
||||
long sumDerivValue = expectedSum - expectedSumPreviousBucket;
|
||||
assertThat(sumDeriv.value(), equalTo((double) sumDerivValue));
|
||||
assertThat((double) bucket.getProperty("histo", AggregationPath.parse("deriv.value").getPathElementsAsStringList()),
|
||||
assertThat(((InternalMultiBucketAggregation.InternalBucket)bucket).getProperty("histo",
|
||||
AggregationPath.parse("deriv.value").getPathElementsAsStringList()),
|
||||
equalTo((double) sumDerivValue));
|
||||
} else {
|
||||
assertThat(sumDeriv, nullValue());
|
||||
|
@ -451,7 +454,7 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
Histogram deriv = searchResponse.getAggregations().get("histo");
|
||||
assertThat(deriv, Matchers.notNullValue());
|
||||
assertThat(deriv.getName(), equalTo("histo"));
|
||||
List<Bucket> buckets = deriv.getBuckets();
|
||||
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(valueCounts_empty.length));
|
||||
|
||||
for (int i = 0; i < valueCounts_empty.length; i++) {
|
||||
|
@ -480,7 +483,7 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
Histogram deriv = searchResponse.getAggregations().get("histo");
|
||||
assertThat(deriv, Matchers.notNullValue());
|
||||
assertThat(deriv.getName(), equalTo("histo"));
|
||||
List<Bucket> buckets = deriv.getBuckets();
|
||||
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(valueCounts_empty.length));
|
||||
|
||||
double lastSumValue = Double.NaN;
|
||||
|
@ -522,7 +525,7 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
Histogram deriv = searchResponse.getAggregations().get("histo");
|
||||
assertThat(deriv, Matchers.notNullValue());
|
||||
assertThat(deriv.getName(), equalTo("histo"));
|
||||
List<Bucket> buckets = deriv.getBuckets();
|
||||
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(valueCounts_empty.length));
|
||||
|
||||
double lastSumValue = Double.NaN;
|
||||
|
@ -561,7 +564,7 @@ public class DerivativeIT extends ESIntegTestCase {
|
|||
Histogram deriv = searchResponse.getAggregations().get("histo");
|
||||
assertThat(deriv, Matchers.notNullValue());
|
||||
assertThat(deriv.getName(), equalTo("histo"));
|
||||
List<Bucket> buckets = deriv.getBuckets();
|
||||
List<? extends Bucket> buckets = deriv.getBuckets();
|
||||
assertThat(buckets.size(), equalTo(numBuckets_empty_rnd));
|
||||
|
||||
double lastSumValue = Double.NaN;
|
||||
|
|
|
@ -616,7 +616,7 @@ public class MoreExpressionTests extends ESIntegTestCase {
|
|||
Histogram histogram = response.getAggregations().get("histogram");
|
||||
assertThat(histogram, notNullValue());
|
||||
assertThat(histogram.getName(), equalTo("histogram"));
|
||||
List<Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
|
||||
|
||||
for (int bucketCount = 0; bucketCount < buckets.size(); ++bucketCount) {
|
||||
Histogram.Bucket bucket = buckets.get(bucketCount);
|
||||
|
|
Loading…
Reference in New Issue