Aggregations: Fix reducing of range aggregations.
Under some rare circumstances: - local transport, - the range aggregation has both a parent and a child aggregation, - the range aggregation got no documents on one shard or more and several documents on one shard or more. the range aggregation could return incorrect counts and sub aggregations. The root cause is that since the reduce happens in-place and since the range aggregation uses the same instance for all sub-aggregation in case of an empty bucket, sometimes non-empty buckets would have been reduced into this shared instance. In order to avoid similar bugs in the future, aggregations have been updated to return a new instance when reducing instead of doing it in-place. Close #6435
This commit is contained in:
parent
52be3748ff
commit
fbd7c9aa5d
|
@ -61,12 +61,6 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||
this.aggregations = aggregations;
|
||||
}
|
||||
|
||||
/** Resets the internal addAggregation */
|
||||
void reset(List<InternalAggregation> aggregations) {
|
||||
this.aggregations = aggregations;
|
||||
this.aggregationsAsMap = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterates over the {@link Aggregation}s.
|
||||
*/
|
||||
|
@ -145,20 +139,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
|
||||
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
|
||||
}
|
||||
InternalAggregations result = aggregationsList.get(0);
|
||||
result.reset(reducedAggregations);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
|
||||
* @param cacheRecycler
|
||||
*/
|
||||
public void reduce(BigArrays bigArrays) {
|
||||
for (int i = 0; i < aggregations.size(); i++) {
|
||||
InternalAggregation aggregation = aggregations.get(i);
|
||||
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays)));
|
||||
}
|
||||
return new InternalAggregations(reducedAggregations);
|
||||
}
|
||||
|
||||
/** The fields required to write this addAggregation to xcontent */
|
||||
|
|
|
@ -33,8 +33,8 @@ import java.util.List;
|
|||
*/
|
||||
public abstract class InternalSingleBucketAggregation extends InternalAggregation implements SingleBucketAggregation {
|
||||
|
||||
protected long docCount;
|
||||
protected InternalAggregations aggregations;
|
||||
private long docCount;
|
||||
private InternalAggregations aggregations;
|
||||
|
||||
protected InternalSingleBucketAggregation() {} // for serialization
|
||||
|
||||
|
@ -61,26 +61,23 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
|
|||
return aggregations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a <b>new</b> empty sub aggregation. This must be a new instance on each call.
|
||||
*/
|
||||
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0));
|
||||
reduced.aggregations.reduce(reduceContext.bigArrays());
|
||||
return reduced;
|
||||
}
|
||||
InternalSingleBucketAggregation reduced = null;
|
||||
long docCount = 0L;
|
||||
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalSingleBucketAggregation) aggregation;
|
||||
} else {
|
||||
this.docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
|
||||
}
|
||||
assert aggregation.getName().equals(getName());
|
||||
docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
|
||||
subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
|
||||
return reduced;
|
||||
final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
|
||||
return newAggregation(getName(), docCount, aggs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -56,4 +56,8 @@ public class InternalFilter extends InternalSingleBucketAggregation implements F
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||
return new InternalFilter(name, docCount, subAggregations);
|
||||
}
|
||||
}
|
|
@ -106,24 +106,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
}
|
||||
|
||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||
if (buckets.size() == 1) {
|
||||
// we still need to reduce the sub aggs
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||
long docCount = 0;
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = bucket;
|
||||
} else {
|
||||
reduced.docCount += bucket.docCount;
|
||||
}
|
||||
docCount += bucket.docCount;
|
||||
aggregationsList.add(bucket.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return reduced;
|
||||
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return new Bucket(geohashAsLong, docCount, aggs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -181,19 +171,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
@Override
|
||||
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
|
||||
grid.reduceAndTrimBuckets(reduceContext.bigArrays());
|
||||
return grid;
|
||||
}
|
||||
InternalGeoHashGrid reduced = null;
|
||||
|
||||
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
||||
if (reduced == null) {
|
||||
reduced = grid;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
|
||||
}
|
||||
|
@ -207,12 +188,6 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
||||
return (InternalGeoHashGrid) aggregations.get(0);
|
||||
}
|
||||
|
||||
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
|
||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
||||
for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
|
||||
|
@ -224,28 +199,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
return reduced;
|
||||
}
|
||||
|
||||
protected void reduceAndTrimBuckets(BigArrays bigArrays) {
|
||||
|
||||
if (requiredSize > buckets.size()) { // nothing to trim
|
||||
for (Bucket bucket : buckets) {
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
List<Bucket> trimmedBuckets = new ArrayList<>(requiredSize);
|
||||
for (Bucket bucket : buckets) {
|
||||
if (trimmedBuckets.size() >= requiredSize) {
|
||||
break;
|
||||
}
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
trimmedBuckets.add(bucket);
|
||||
}
|
||||
buckets = trimmedBuckets;
|
||||
return new InternalGeoHashGrid(getName(), requiredSize, Arrays.asList(list));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -56,4 +56,9 @@ public class InternalGlobal extends InternalSingleBucketAggregation implements G
|
|||
public Type type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||
return new InternalGlobal(name, docCount, subAggregations);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||
super(key, docCount, formatter, aggregations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return formatter != null ? formatter.format(key) : ValueFormatter.DateTime.DEFAULT.format(key);
|
||||
|
@ -109,6 +114,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bucket getBucketByKey(String key) {
|
||||
try {
|
||||
|
|
|
@ -67,10 +67,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
|
||||
public static class Bucket implements Histogram.Bucket {
|
||||
|
||||
long key;
|
||||
long docCount;
|
||||
final long key;
|
||||
final long docCount;
|
||||
protected transient final @Nullable ValueFormatter formatter;
|
||||
InternalAggregations aggregations;
|
||||
final InternalAggregations aggregations;
|
||||
|
||||
public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
|
||||
this.key = key;
|
||||
|
@ -79,6 +79,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
this.aggregations = aggregations;
|
||||
}
|
||||
|
||||
protected Factory<?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return formatter != null ? formatter.format(key) : ValueFormatter.RAW.format(key);
|
||||
|
@ -105,24 +109,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
}
|
||||
|
||||
<B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
|
||||
if (buckets.size() == 1) {
|
||||
// we only need to reduce the sub aggregations
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
return (B) bucket;
|
||||
}
|
||||
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
|
||||
Bucket reduced = null;
|
||||
long docCount = 0;
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = bucket;
|
||||
} else {
|
||||
reduced.docCount += bucket.docCount;
|
||||
}
|
||||
docCount += bucket.docCount;
|
||||
aggregations.add((InternalAggregations) bucket.getAggregations());
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays);
|
||||
return (B) reduced;
|
||||
InternalAggregations aggs = InternalAggregations.reduce(aggregations, bigArrays);
|
||||
return (B) getFactory().createBucket(key, docCount, aggs, formatter);
|
||||
}
|
||||
|
||||
void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException {
|
||||
|
@ -256,98 +250,13 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
return bucketsMap.get(key.longValue());
|
||||
}
|
||||
|
||||
protected Factory<B> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
|
||||
InternalHistogram<B> histo = (InternalHistogram<B>) aggregations.get(0);
|
||||
|
||||
if (minDocCount == 1) {
|
||||
for (B bucket : histo.buckets) {
|
||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
||||
}
|
||||
return histo;
|
||||
}
|
||||
|
||||
CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
|
||||
List<B> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
|
||||
B lastBucket = null;
|
||||
ListIterator<B> iter = list.listIterator();
|
||||
|
||||
// we need to fill the gaps with empty buckets
|
||||
if (minDocCount == 0) {
|
||||
ExtendedBounds bounds = emptyBucketInfo.bounds;
|
||||
|
||||
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
|
||||
if (bounds != null) {
|
||||
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
|
||||
if (firstBucket == null) {
|
||||
if (bounds.min != null && bounds.max != null) {
|
||||
long key = bounds.min;
|
||||
long max = bounds.max;
|
||||
while (key <= max) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (bounds.min != null) {
|
||||
long key = bounds.min;
|
||||
while (key < firstBucket.key) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now adding the empty buckets within the actual data,
|
||||
// e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6
|
||||
while (iter.hasNext()) {
|
||||
// look ahead on the next bucket without advancing the iter
|
||||
// so we'll be able to insert elements at the right position
|
||||
B nextBucket = list.get(iter.nextIndex());
|
||||
nextBucket.aggregations.reduce(reduceContext.bigArrays());
|
||||
if (lastBucket != null) {
|
||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||
while (key != nextBucket.key) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
lastBucket = iter.next();
|
||||
}
|
||||
|
||||
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
|
||||
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
|
||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||
long max = bounds.max;
|
||||
while (key <= max) {
|
||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
while (iter.hasNext()) {
|
||||
InternalHistogram.Bucket bucket = iter.next();
|
||||
if (bucket.getDocCount() < minDocCount) {
|
||||
iter.remove();
|
||||
} else {
|
||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) {
|
||||
CollectionUtil.introSort(histo.buckets, order.comparator());
|
||||
}
|
||||
|
||||
return histo;
|
||||
}
|
||||
|
||||
InternalHistogram reduced = (InternalHistogram) aggregations.get(0);
|
||||
|
||||
LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays());
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
|
@ -437,9 +346,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
||||
}
|
||||
|
||||
|
||||
reduced.buckets = reducedBuckets;
|
||||
return reduced;
|
||||
return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed);
|
||||
}
|
||||
|
||||
protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
||||
|
|
|
@ -58,4 +58,8 @@ public class InternalMissing extends InternalSingleBucketAggregation implements
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||
return new InternalMissing(name, docCount, subAggregations);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,4 +57,8 @@ public class InternalNested extends InternalSingleBucketAggregation implements N
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||
return new InternalNested(name, docCount, subAggregations);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,4 +57,8 @@ public class InternalReverseNested extends InternalSingleBucketAggregation imple
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||
return new InternalReverseNested(name, docCount, subAggregations);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,11 +60,12 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
|
||||
public static class Bucket implements Range.Bucket {
|
||||
|
||||
private double from = Double.NEGATIVE_INFINITY;
|
||||
private double to = Double.POSITIVE_INFINITY;
|
||||
private long docCount;
|
||||
InternalAggregations aggregations;
|
||||
private String key;
|
||||
private final ValueFormatter formatter;
|
||||
private final double from;
|
||||
private final double to;
|
||||
private final long docCount;
|
||||
final InternalAggregations aggregations;
|
||||
private final String key;
|
||||
|
||||
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
||||
this.key = key != null ? key : generateKey(from, to, formatter);
|
||||
|
@ -72,6 +73,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
this.to = to;
|
||||
this.docCount = docCount;
|
||||
this.aggregations = aggregations;
|
||||
this.formatter = formatter;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
|
@ -103,25 +105,19 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
return aggregations;
|
||||
}
|
||||
|
||||
protected Factory<? extends Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
|
||||
if (ranges.size() == 1) {
|
||||
// we stil need to call reduce on all the sub aggregations
|
||||
Bucket bucket = ranges.get(0);
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
long docCount = 0;
|
||||
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
|
||||
for (Bucket range : ranges) {
|
||||
if (reduced == null) {
|
||||
reduced = range;
|
||||
} else {
|
||||
reduced.docCount += range.docCount;
|
||||
}
|
||||
docCount += range.docCount;
|
||||
aggregationsList.add(range.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return reduced;
|
||||
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return getFactory().createBucket(key, from, to, docCount, aggs, formatter);
|
||||
}
|
||||
|
||||
void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException {
|
||||
|
@ -164,8 +160,8 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
return TYPE.name();
|
||||
}
|
||||
|
||||
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
return (R) new InternalRange<>(name, ranges, formatter, keyed, unmapped);
|
||||
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
return (R) new InternalRange<>(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
|
||||
|
@ -178,16 +174,14 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
private Map<String, B> rangeMap;
|
||||
private @Nullable ValueFormatter formatter;
|
||||
private boolean keyed;
|
||||
private boolean unmapped;
|
||||
|
||||
public InternalRange() {} // for serialization
|
||||
|
||||
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
super(name);
|
||||
this.ranges = ranges;
|
||||
this.formatter = formatter;
|
||||
this.keyed = keyed;
|
||||
this.unmapped = unmapped;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,52 +205,31 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
return rangeMap.get(key);
|
||||
}
|
||||
|
||||
protected Factory<B, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalRange<B> reduced = (InternalRange<B>) aggregations.get(0);
|
||||
for (B bucket : reduced.ranges) {
|
||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
||||
}
|
||||
return reduced;
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Bucket>[] rangeList = new List[ranges.size()];
|
||||
for (int i = 0; i < rangeList.length; ++i) {
|
||||
rangeList[i] = new ArrayList<Bucket>();
|
||||
}
|
||||
List<List<Bucket>> rangesList = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalRange<Bucket> ranges = (InternalRange) aggregation;
|
||||
if (ranges.unmapped) {
|
||||
continue;
|
||||
}
|
||||
if (rangesList == null) {
|
||||
rangesList = new ArrayList<>(ranges.ranges.size());
|
||||
for (Bucket bucket : ranges.ranges) {
|
||||
List<Bucket> sameRangeList = new ArrayList<>(aggregations.size());
|
||||
sameRangeList.add(bucket);
|
||||
rangesList.add(sameRangeList);
|
||||
}
|
||||
} else {
|
||||
int i = 0;
|
||||
for (Bucket range : ranges.ranges) {
|
||||
rangesList.get(i++).add(range);
|
||||
}
|
||||
InternalRange<?> ranges = (InternalRange<?>) aggregation;
|
||||
int i = 0;
|
||||
for (Bucket range : ranges.ranges) {
|
||||
rangeList[i++].add(range);
|
||||
}
|
||||
}
|
||||
|
||||
if (rangesList == null) {
|
||||
// unmapped, we can just take the first one
|
||||
return aggregations.get(0);
|
||||
final List<B> ranges = new ArrayList<>();
|
||||
for (int i = 0; i < this.ranges.size(); ++i) {
|
||||
ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext.bigArrays()));
|
||||
}
|
||||
|
||||
InternalRange reduced = (InternalRange) aggregations.get(0);
|
||||
int i = 0;
|
||||
for (List<Bucket> sameRangeList : rangesList) {
|
||||
reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays()));
|
||||
}
|
||||
return reduced;
|
||||
}
|
||||
|
||||
protected B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
||||
return (B) new Bucket(key, from, to, docCount, aggregations, formatter);
|
||||
return getFactory().create(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -268,7 +241,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||
List<B> ranges = Lists.newArrayListWithCapacity(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
String key = in.readOptionalString();
|
||||
ranges.add(createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
|
||||
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
|
||||
}
|
||||
this.ranges = ranges;
|
||||
this.rangeMap = null;
|
||||
|
|
|
@ -203,7 +203,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||
buckets.add(bucket);
|
||||
}
|
||||
// value source can be null in the case of unmapped fields
|
||||
return rangeFactory.create(name, buckets, formatter, keyed, false);
|
||||
return rangeFactory.create(name, buckets, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -217,7 +217,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||
buckets.add(bucket);
|
||||
}
|
||||
// value source can be null in the case of unmapped fields
|
||||
return rangeFactory.create(name, buckets, formatter, keyed, false);
|
||||
return rangeFactory.create(name, buckets, formatter, keyed);
|
||||
}
|
||||
|
||||
private static final void sortRanges(final Range[] ranges) {
|
||||
|
@ -274,7 +274,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||
for (RangeAggregator.Range range : ranges) {
|
||||
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter));
|
||||
}
|
||||
return factory.create(name, buckets, formatter, keyed, true);
|
||||
return factory.create(name, buckets, formatter, keyed);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -72,6 +72,11 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||
public DateTime getToAsDate() {
|
||||
return Double.isInfinite(getTo().doubleValue()) ? null : new DateTime(getTo().longValue(), DateTimeZone.UTC);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
|
||||
|
@ -82,8 +87,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
return new InternalDateRange(name, ranges, formatter, keyed, unmapped);
|
||||
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed) {
|
||||
return new InternalDateRange(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,8 +99,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||
|
||||
InternalDateRange() {} // for serialization
|
||||
|
||||
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
super(name, ranges, formatter, keyed, unmapped);
|
||||
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
super(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,7 +109,7 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
||||
return new Bucket(key, from, to, docCount, aggregations, formatter);
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,10 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||
super(key, from, to, docCount, aggregations, formatter);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
|
||||
|
@ -71,8 +75,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
return new InternalGeoDistance(name, ranges, formatter, keyed, unmapped);
|
||||
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
return new InternalGeoDistance(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,8 +87,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||
|
||||
InternalGeoDistance() {} // for serialization
|
||||
|
||||
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
super(name, ranges, formatter, keyed, unmapped);
|
||||
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
super(name, ranges, formatter, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,7 +97,7 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
||||
return new Bucket(key, from, to, docCount, aggregations, formatter);
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
}
|
|
@ -74,6 +74,11 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||
double to = getTo().doubleValue();
|
||||
return Double.isInfinite(to) ? null : MAX_IP == to ? null : ValueFormatter.IPv4.format(to);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> {
|
||||
|
@ -84,8 +89,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
||||
return new InternalIPv4Range(name, ranges, keyed, unmapped);
|
||||
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||
return new InternalIPv4Range(name, ranges, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,8 +101,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||
|
||||
public InternalIPv4Range() {} // for serialization
|
||||
|
||||
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed, boolean unmapped) {
|
||||
super(name, ranges, ValueFormatter.IPv4, keyed, unmapped);
|
||||
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed) {
|
||||
super(name, ranges, ValueFormatter.IPv4, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -106,8 +111,7 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter ) {
|
||||
return new Bucket(key, from, to, docCount, aggregations);
|
||||
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||
return FACTORY;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.significant;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
|
@ -143,25 +142,20 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
}
|
||||
|
||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||
if (buckets.size() == 1) {
|
||||
return buckets.get(0);
|
||||
}
|
||||
Bucket reduced = null;
|
||||
long subsetDf = 0;
|
||||
long supersetDf = 0;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = bucket;
|
||||
} else {
|
||||
reduced.subsetDf += bucket.subsetDf;
|
||||
reduced.supersetDf += bucket.supersetDf;
|
||||
reduced.updateScore();
|
||||
}
|
||||
subsetDf += bucket.subsetDf;
|
||||
supersetDf += bucket.supersetDf;
|
||||
aggregationsList.add(bucket.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return reduced;
|
||||
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs);
|
||||
}
|
||||
|
||||
abstract Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations);
|
||||
|
||||
@Override
|
||||
public double getSignificanceScore() {
|
||||
return score;
|
||||
|
@ -201,14 +195,8 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalSignificantTerms reduce(ReduceContext reduceContext) {
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalSignificantTerms terms = (InternalSignificantTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries();
|
||||
return terms;
|
||||
}
|
||||
InternalSignificantTerms reduced = null;
|
||||
|
||||
long globalSubsetSize = 0;
|
||||
long globalSupersetSize = 0;
|
||||
|
@ -219,18 +207,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
globalSubsetSize += terms.subsetSize;
|
||||
globalSupersetSize += terms.supersetSize;
|
||||
}
|
||||
Map<String, List<InternalSignificantTerms.Bucket>> buckets = null;
|
||||
Map<String, List<InternalSignificantTerms.Bucket>> buckets = new HashMap<>();
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalSignificantTerms terms = (InternalSignificantTerms) aggregation;
|
||||
if (terms instanceof UnmappedSignificantTerms) {
|
||||
continue;
|
||||
}
|
||||
if (reduced == null) {
|
||||
reduced = terms;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = new HashMap<>(terms.buckets.size());
|
||||
}
|
||||
for (Bucket bucket : terms.buckets) {
|
||||
List<Bucket> existingBuckets = buckets.get(bucket.getKey());
|
||||
if (existingBuckets == null) {
|
||||
|
@ -239,19 +218,10 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
}
|
||||
// Adjust the buckets with the global stats representing the
|
||||
// total size of the pots from which the stats are drawn
|
||||
bucket.subsetSize = globalSubsetSize;
|
||||
bucket.supersetSize = globalSupersetSize;
|
||||
bucket.updateScore();
|
||||
existingBuckets.add(bucket);
|
||||
existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, bucket.aggregations));
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one
|
||||
// (no need to reduce)
|
||||
return (UnmappedSignificantTerms) aggregations.get(0);
|
||||
}
|
||||
|
||||
final int size = Math.min(requiredSize, buckets.size());
|
||||
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
|
||||
for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) {
|
||||
|
@ -265,23 +235,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = (Bucket) ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
reduced.subsetSize = globalSubsetSize;
|
||||
reduced.supersetSize = globalSupersetSize;
|
||||
return reduced;
|
||||
return newAggregation(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
|
||||
}
|
||||
|
||||
final void trimExcessEntries() {
|
||||
final List<Bucket> newBuckets = Lists.newArrayList();
|
||||
for (Bucket b : buckets) {
|
||||
if (newBuckets.size() >= requiredSize) {
|
||||
break;
|
||||
}
|
||||
if (b.subsetDf >= minDocCount) {
|
||||
newBuckets.add(b);
|
||||
}
|
||||
}
|
||||
buckets = newBuckets;
|
||||
}
|
||||
abstract InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets);
|
||||
|
||||
}
|
||||
|
|
|
@ -83,6 +83,11 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
return Long.toString(term);
|
||||
}
|
||||
|
||||
@Override
|
||||
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
|
||||
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private ValueFormatter formatter;
|
||||
|
@ -101,6 +106,12 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
|
||||
List<InternalSignificantTerms.Bucket> buckets) {
|
||||
return new SignificantLongTerms(subsetSize, supersetSize, getName(), formatter, requiredSize, supersetSize, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
|
|
|
@ -85,6 +85,10 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
return termBytes.utf8ToString();
|
||||
}
|
||||
|
||||
@Override
|
||||
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
|
||||
return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
|
||||
}
|
||||
}
|
||||
|
||||
SignificantStringTerms() {} // for serialization
|
||||
|
@ -99,6 +103,12 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
|
||||
List<InternalSignificantTerms.Bucket> buckets) {
|
||||
return new SignificantStringTerms(subsetSize, supersetSize, getName(), requiredSize, supersetSize, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
|
|
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -64,6 +66,21 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
if (!(aggregation instanceof UnmappedSignificantTerms)) {
|
||||
return aggregation.reduce(reduceContext);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets) {
|
||||
throw new UnsupportedOperationException("How did you get there?");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
|
|
|
@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.DoubleObjectPagedHashMap;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -87,6 +83,15 @@ public class DoubleTerms extends InternalTerms {
|
|||
return Double.compare(term, other.getKeyAsNumber().doubleValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getKeyAsObject() {
|
||||
return getKeyAsNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||
return new Bucket(term, docCount, aggs);
|
||||
}
|
||||
}
|
||||
|
||||
private @Nullable ValueFormatter formatter;
|
||||
|
@ -104,59 +109,8 @@ public class DoubleTerms extends InternalTerms {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
||||
return terms;
|
||||
}
|
||||
InternalTerms reduced = null;
|
||||
|
||||
DoubleObjectPagedHashMap<List<Bucket>> buckets = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalTerms terms = (InternalTerms) aggregation;
|
||||
if (terms instanceof UnmappedTerms) {
|
||||
continue;
|
||||
}
|
||||
if (reduced == null) {
|
||||
reduced = terms;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = new DoubleObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
|
||||
}
|
||||
for (Terms.Bucket bucket : terms.buckets) {
|
||||
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
|
||||
if (existingBuckets == null) {
|
||||
existingBuckets = new ArrayList<>(aggregations.size());
|
||||
buckets.put(((Bucket) bucket).term, existingBuckets);
|
||||
}
|
||||
existingBuckets.add((Bucket) bucket);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
||||
return (UnmappedTerms) aggregations.get(0);
|
||||
}
|
||||
|
||||
// TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ?
|
||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
||||
for (DoubleObjectPagedHashMap.Cursor<List<DoubleTerms.Bucket>> cursor : buckets) {
|
||||
List<DoubleTerms.Bucket> sameTermBuckets = cursor.value;
|
||||
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
||||
if (b.getDocCount() >= minDocCount) {
|
||||
ordered.insertWithOverflow(b);
|
||||
}
|
||||
}
|
||||
buckets.close();
|
||||
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = (Bucket) ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
return reduced;
|
||||
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||
return new DoubleTerms(name, order, formatter, requiredSize, minDocCount, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.bucket.terms;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
|
@ -58,24 +58,19 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
return aggregations;
|
||||
}
|
||||
|
||||
abstract Object getKeyAsObject();
|
||||
|
||||
abstract Bucket newBucket(long docCount, InternalAggregations aggs);
|
||||
|
||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||
if (buckets.size() == 1) {
|
||||
Bucket bucket = buckets.get(0);
|
||||
bucket.aggregations.reduce(bigArrays);
|
||||
return bucket;
|
||||
}
|
||||
Bucket reduced = null;
|
||||
long docCount = 0;
|
||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||
for (Bucket bucket : buckets) {
|
||||
if (reduced == null) {
|
||||
reduced = bucket;
|
||||
} else {
|
||||
reduced.docCount += bucket.docCount;
|
||||
}
|
||||
docCount += bucket.docCount;
|
||||
aggregationsList.add(bucket.aggregations);
|
||||
}
|
||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return reduced;
|
||||
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||
return newBucket(docCount, aggs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,47 +108,21 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
||||
return terms;
|
||||
}
|
||||
|
||||
InternalTerms reduced = null;
|
||||
|
||||
Map<Text, List<InternalTerms.Bucket>> buckets = null;
|
||||
Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalTerms terms = (InternalTerms) aggregation;
|
||||
if (terms instanceof UnmappedTerms) {
|
||||
continue;
|
||||
}
|
||||
if (reduced == null) {
|
||||
reduced = terms;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = new HashMap<>(terms.buckets.size());
|
||||
}
|
||||
for (Bucket bucket : terms.buckets) {
|
||||
List<Bucket> existingBuckets = buckets.get(bucket.getKeyAsText());
|
||||
if (existingBuckets == null) {
|
||||
existingBuckets = new ArrayList<>(aggregations.size());
|
||||
buckets.put(bucket.getKeyAsText(), existingBuckets);
|
||||
}
|
||||
existingBuckets.add(bucket);
|
||||
buckets.put(bucket.getKeyAsObject(), bucket);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
||||
return (UnmappedTerms) aggregations.get(0);
|
||||
}
|
||||
|
||||
final int size = Math.min(requiredSize, buckets.size());
|
||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
||||
for (Map.Entry<Text, List<Bucket>> entry : buckets.entrySet()) {
|
||||
List<Bucket> sameTermBuckets = entry.getValue();
|
||||
for (Collection<Bucket> l : buckets.asMap().values()) {
|
||||
List<Bucket> sameTermBuckets = (List<Bucket>) l; // cast is ok according to javadocs
|
||||
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
||||
if (b.docCount >= minDocCount) {
|
||||
ordered.insertWithOverflow(b);
|
||||
|
@ -163,22 +132,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = (Bucket) ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
return reduced;
|
||||
return newAggregation(name, Arrays.asList(list));
|
||||
}
|
||||
|
||||
final void trimExcessEntries(BigArrays bigArrays) {
|
||||
final List<Bucket> newBuckets = Lists.newArrayList();
|
||||
for (Bucket b : buckets) {
|
||||
if (newBuckets.size() >= requiredSize) {
|
||||
break;
|
||||
}
|
||||
if (b.docCount >= minDocCount) {
|
||||
newBuckets.add(b);
|
||||
b.aggregations.reduce(bigArrays);
|
||||
}
|
||||
}
|
||||
buckets = newBuckets;
|
||||
}
|
||||
protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets);
|
||||
|
||||
}
|
||||
|
|
|
@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.text.StringText;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -87,6 +83,16 @@ public class LongTerms extends InternalTerms {
|
|||
int compareTerm(Terms.Bucket other) {
|
||||
return Long.compare(term, other.getKeyAsNumber().longValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getKeyAsObject() {
|
||||
return getKeyAsNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||
return new Bucket(term, docCount, aggs);
|
||||
}
|
||||
}
|
||||
|
||||
private @Nullable ValueFormatter formatter;
|
||||
|
@ -104,59 +110,8 @@ public class LongTerms extends InternalTerms {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
||||
return terms;
|
||||
}
|
||||
InternalTerms reduced = null;
|
||||
|
||||
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalTerms terms = (InternalTerms) aggregation;
|
||||
if (terms instanceof UnmappedTerms) {
|
||||
continue;
|
||||
}
|
||||
if (reduced == null) {
|
||||
reduced = terms;
|
||||
}
|
||||
if (buckets == null) {
|
||||
buckets = new LongObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
|
||||
}
|
||||
for (Terms.Bucket bucket : terms.buckets) {
|
||||
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
|
||||
if (existingBuckets == null) {
|
||||
existingBuckets = new ArrayList<>(aggregations.size());
|
||||
buckets.put(((Bucket) bucket).term, existingBuckets);
|
||||
}
|
||||
existingBuckets.add((Bucket) bucket);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) {
|
||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
||||
return (UnmappedTerms) aggregations.get(0);
|
||||
}
|
||||
|
||||
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
|
||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
||||
for (LongObjectPagedHashMap.Cursor<List<LongTerms.Bucket>> cursor : buckets) {
|
||||
List<LongTerms.Bucket> sameTermBuckets = cursor.value;
|
||||
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
||||
if (b.getDocCount() >= minDocCount) {
|
||||
ordered.insertWithOverflow(b);
|
||||
}
|
||||
}
|
||||
buckets.close();
|
||||
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
list[i] = (Bucket) ordered.pop();
|
||||
}
|
||||
reduced.buckets = Arrays.asList(list);
|
||||
return reduced;
|
||||
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||
return new LongTerms(name, order, formatter, requiredSize, minDocCount, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -84,6 +84,16 @@ public class StringTerms extends InternalTerms {
|
|||
int compareTerm(Terms.Bucket other) {
|
||||
return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
Object getKeyAsObject() {
|
||||
return getKeyAsText();
|
||||
}
|
||||
|
||||
@Override
|
||||
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||
return new Bucket(termBytes, docCount, aggs);
|
||||
}
|
||||
}
|
||||
|
||||
StringTerms() {} // for serialization
|
||||
|
@ -97,6 +107,11 @@ public class StringTerms extends InternalTerms {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||
return new StringTerms(name, order, requiredSize, minDocCount, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
|
|
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -80,6 +82,21 @@ public class UnmappedTerms extends InternalTerms {
|
|||
out.writeVLong(minDocCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
for (InternalAggregation agg : reduceContext.aggregations()) {
|
||||
if (!(agg instanceof UnmappedTerms)) {
|
||||
return agg.reduce(reduceContext);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalTerms newAggregation(String name, List<Bucket> buckets) {
|
||||
throw new UnsupportedOperationException("How did you get there?");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -76,20 +75,13 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
|
|||
|
||||
@Override
|
||||
public InternalAvg reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (InternalAvg) aggregations.get(0);
|
||||
long count = 0;
|
||||
double sum = 0;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
count += ((InternalAvg) aggregation).count;
|
||||
sum += ((InternalAvg) aggregation).sum;
|
||||
}
|
||||
InternalAvg reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalAvg) aggregation;
|
||||
} else {
|
||||
reduced.count += ((InternalAvg) aggregation).count;
|
||||
reduced.sum += ((InternalAvg) aggregation).sum;
|
||||
}
|
||||
}
|
||||
return reduced;
|
||||
return new InternalAvg(getName(), sum, count);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -184,6 +184,10 @@ public final class HyperLogLogPlusPlus implements Releasable {
|
|||
alphaMM = alpha * m * m;
|
||||
}
|
||||
|
||||
public int precision() {
|
||||
return p;
|
||||
}
|
||||
|
||||
public long maxBucket() {
|
||||
return runLens.size() >>> p;
|
||||
}
|
||||
|
|
|
@ -104,18 +104,17 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
|
|||
final InternalCardinality cardinality = (InternalCardinality) aggregation;
|
||||
if (cardinality.counts != null) {
|
||||
if (reduced == null) {
|
||||
reduced = cardinality;
|
||||
} else {
|
||||
reduced.merge(cardinality);
|
||||
reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), BigArrays.NON_RECYCLING_INSTANCE, 1));
|
||||
}
|
||||
reduced.merge(cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
if (reduced == null) { // all empty
|
||||
reduced = (InternalCardinality) aggregations.get(0);
|
||||
return aggregations.get(0);
|
||||
} else {
|
||||
return reduced;
|
||||
}
|
||||
|
||||
return reduced;
|
||||
}
|
||||
|
||||
public void merge(InternalCardinality other) {
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds {
|
||||
|
||||
|
@ -72,36 +71,36 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
InternalGeoBounds reduced = null;
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
|
||||
|
||||
if (reduced == null) {
|
||||
reduced = bounds;
|
||||
continue;
|
||||
}
|
||||
double top = Double.NEGATIVE_INFINITY;
|
||||
double bottom = Double.POSITIVE_INFINITY;
|
||||
double posLeft = Double.POSITIVE_INFINITY;
|
||||
double posRight = Double.NEGATIVE_INFINITY;
|
||||
double negLeft = Double.POSITIVE_INFINITY;
|
||||
double negRight = Double.NEGATIVE_INFINITY;
|
||||
|
||||
if (bounds.top > reduced.top) {
|
||||
reduced.top = bounds.top;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
|
||||
|
||||
if (bounds.top > top) {
|
||||
top = bounds.top;
|
||||
}
|
||||
if (bounds.bottom < reduced.bottom) {
|
||||
reduced.bottom = bounds.bottom;
|
||||
if (bounds.bottom < bottom) {
|
||||
bottom = bounds.bottom;
|
||||
}
|
||||
if (bounds.posLeft < reduced.posLeft) {
|
||||
reduced.posLeft = bounds.posLeft;
|
||||
if (bounds.posLeft < posLeft) {
|
||||
posLeft = bounds.posLeft;
|
||||
}
|
||||
if (bounds.posRight > reduced.posRight) {
|
||||
reduced.posRight = bounds.posRight;
|
||||
if (bounds.posRight > posRight) {
|
||||
posRight = bounds.posRight;
|
||||
}
|
||||
if (bounds.negLeft < reduced.negLeft) {
|
||||
reduced.negLeft = bounds.negLeft;
|
||||
if (bounds.negLeft < negLeft) {
|
||||
negLeft = bounds.negLeft;
|
||||
}
|
||||
if (bounds.negRight > reduced.negRight) {
|
||||
reduced.negRight = bounds.negRight;
|
||||
if (bounds.negRight > negRight) {
|
||||
negRight = bounds.negRight;
|
||||
}
|
||||
}
|
||||
return reduced;
|
||||
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -74,22 +73,11 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
|
|||
|
||||
@Override
|
||||
public InternalMax reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (InternalMax) aggregations.get(0);
|
||||
double max = Double.NEGATIVE_INFINITY;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
max = Math.max(max, ((InternalMax) aggregation).max);
|
||||
}
|
||||
InternalMax reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalMax) aggregation;
|
||||
} else {
|
||||
reduced.max = Math.max(reduced.max, ((InternalMax) aggregation).max);
|
||||
}
|
||||
}
|
||||
if (reduced != null) {
|
||||
return reduced;
|
||||
}
|
||||
return (InternalMax) aggregations.get(0);
|
||||
return new InternalMax(name, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -75,22 +74,11 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
|
|||
|
||||
@Override
|
||||
public InternalMin reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (InternalMin) aggregations.get(0);
|
||||
double min = Double.POSITIVE_INFINITY;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
min = Math.min(min, ((InternalMin) aggregation).min);
|
||||
}
|
||||
InternalMin reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalMin) aggregation;
|
||||
} else {
|
||||
reduced.min = Math.min(reduced.min, ((InternalMin) aggregation).min);
|
||||
}
|
||||
}
|
||||
if (reduced != null) {
|
||||
return reduced;
|
||||
}
|
||||
return (InternalMin) aggregations.get(0);
|
||||
return new InternalMin(getName(), min);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -89,17 +88,15 @@ public class InternalPercentiles extends InternalNumericMetricsAggregation.Multi
|
|||
|
||||
@Override
|
||||
public InternalPercentiles reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
InternalPercentiles merged = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
TDigestState merged = null;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
|
||||
if (merged == null) {
|
||||
merged = percentiles;
|
||||
} else {
|
||||
merged.state.add(percentiles.state);
|
||||
merged = new TDigestState(percentiles.state.compression());
|
||||
}
|
||||
merged.add(percentiles.state);
|
||||
}
|
||||
return merged;
|
||||
return new InternalPercentiles(getName(), percents, merged, keyed);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,10 @@ public class TDigestState extends AVLTreeDigest {
|
|||
this.compression = compression;
|
||||
}
|
||||
|
||||
public double compression() {
|
||||
return compression;
|
||||
}
|
||||
|
||||
public static void write(TDigestState state, StreamOutput out) throws IOException {
|
||||
out.writeDouble(state.compression);
|
||||
out.writeVInt(state.centroidCount());
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -120,33 +119,18 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
|
|||
|
||||
@Override
|
||||
public InternalStats reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (InternalStats) aggregations.get(0);
|
||||
long count = 0;
|
||||
double min = Double.POSITIVE_INFINITY;
|
||||
double max = Double.NEGATIVE_INFINITY;
|
||||
double sum = 0;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
InternalStats stats = (InternalStats) aggregation;
|
||||
count += stats.getCount();
|
||||
min = Math.min(min, stats.getMin());
|
||||
max = Math.max(max, stats.getMax());
|
||||
sum += stats.getSum();
|
||||
}
|
||||
InternalStats reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
if (((InternalStats) aggregation).count != 0) {
|
||||
reduced = (InternalStats) aggregation;
|
||||
}
|
||||
} else {
|
||||
if (((InternalStats) aggregation).count != 0) {
|
||||
reduced.count += ((InternalStats) aggregation).count;
|
||||
reduced.min = Math.min(reduced.min, ((InternalStats) aggregation).min);
|
||||
reduced.max = Math.max(reduced.max, ((InternalStats) aggregation).max);
|
||||
reduced.sum += ((InternalStats) aggregation).sum;
|
||||
mergeOtherStats(reduced, aggregation);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (reduced != null) {
|
||||
return reduced;
|
||||
}
|
||||
return (InternalStats) aggregations.get(0);
|
||||
}
|
||||
|
||||
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
|
||||
return new InternalStats(name, count, sum, min, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -101,8 +101,14 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
|
||||
((InternalExtendedStats) to).sumOfSqrs += ((InternalExtendedStats) from).sumOfSqrs;
|
||||
public InternalExtendedStats reduce(ReduceContext reduceContext) {
|
||||
double sumOfSqrs = 0;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
InternalExtendedStats stats = (InternalExtendedStats) aggregation;
|
||||
sumOfSqrs += stats.getSumOfSquares();
|
||||
}
|
||||
final InternalStats stats = super.reduce(reduceContext);
|
||||
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -74,22 +73,11 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
|
|||
|
||||
@Override
|
||||
public InternalSum reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return (InternalSum) aggregations.get(0);
|
||||
double sum = 0;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
sum += ((InternalSum) aggregation).sum;
|
||||
}
|
||||
InternalSum reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalSum) aggregation;
|
||||
} else {
|
||||
reduced.sum += ((InternalSum) aggregation).sum;
|
||||
}
|
||||
}
|
||||
if (reduced != null) {
|
||||
return reduced;
|
||||
}
|
||||
return (InternalSum) aggregations.get(0);
|
||||
return new InternalSum(name, sum);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -94,10 +94,6 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
|
|||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1 && from == 0) {
|
||||
return aggregations.get(0);
|
||||
}
|
||||
|
||||
TopDocs[] shardDocs = new TopDocs[aggregations.size()];
|
||||
InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()];
|
||||
for (int i = 0; i < shardDocs.length; i++) {
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An internal implementation of {@link ValueCount}.
|
||||
|
@ -69,19 +68,11 @@ public class InternalValueCount extends InternalNumericMetricsAggregation implem
|
|||
|
||||
@Override
|
||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||
if (aggregations.size() == 1) {
|
||||
return aggregations.get(0);
|
||||
long valueCount = 0;
|
||||
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||
valueCount += ((InternalValueCount) aggregation).value;
|
||||
}
|
||||
InternalValueCount reduced = null;
|
||||
for (InternalAggregation aggregation : aggregations) {
|
||||
if (reduced == null) {
|
||||
reduced = (InternalValueCount) aggregation;
|
||||
} else {
|
||||
reduced.value += ((InternalValueCount) aggregation).value;
|
||||
}
|
||||
}
|
||||
return reduced;
|
||||
return new InternalValueCount(name, valueCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range;
|
|||
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -310,4 +311,49 @@ public class RandomTests extends ElasticsearchIntegrationTest {
|
|||
assertEquals(numDocs, response.getHits().getTotalHits());
|
||||
}
|
||||
|
||||
// https://github.com/elasticsearch/elasticsearch/issues/6435
|
||||
public void testReduce() throws Exception {
|
||||
createIndex("idx");
|
||||
final int value = randomIntBetween(0, 10);
|
||||
indexRandom(true, client().prepareIndex("idx", "type").setSource("f", value));
|
||||
|
||||
SearchResponse response = client().prepareSearch("idx")
|
||||
.addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter())
|
||||
.subAggregation(range("range")
|
||||
.field("f")
|
||||
.addUnboundedTo(6)
|
||||
.addUnboundedFrom(6)
|
||||
.subAggregation(sum("sum").field("f"))))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);System.out.println(response);
|
||||
|
||||
Filter filter = response.getAggregations().get("filter");
|
||||
assertNotNull(filter);
|
||||
assertEquals(1, filter.getDocCount());
|
||||
|
||||
Range range = filter.getAggregations().get("range");
|
||||
assertThat(range, notNullValue());
|
||||
assertThat(range.getName(), equalTo("range"));
|
||||
assertThat(range.getBuckets().size(), equalTo(2));
|
||||
|
||||
Range.Bucket bucket = range.getBucketByKey("*-6.0");
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(bucket.getKey(), equalTo("*-6.0"));
|
||||
assertThat(bucket.getFrom().doubleValue(), equalTo(Double.NEGATIVE_INFINITY));
|
||||
assertThat(bucket.getTo().doubleValue(), equalTo(6.0));
|
||||
assertThat(bucket.getDocCount(), equalTo(value < 6 ? 1L : 0L));
|
||||
Sum sum = bucket.getAggregations().get("sum");
|
||||
assertEquals(value < 6 ? value : 0, sum.getValue(), 0d);
|
||||
|
||||
bucket = range.getBucketByKey("6.0-*");
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat(bucket.getKey(), equalTo("6.0-*"));
|
||||
assertThat(bucket.getFrom().doubleValue(), equalTo(6.0));
|
||||
assertThat(bucket.getTo().doubleValue(), equalTo(Double.POSITIVE_INFINITY));
|
||||
assertThat(bucket.getDocCount(), equalTo(value >= 6 ? 1L : 0L));
|
||||
sum = bucket.getAggregations().get("sum");
|
||||
assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue