Aggregations: Fix reducing of range aggregations.

Under some rare circumstances:
 - local transport,
 - the range aggregation has both a parent and a child aggregation,
 - the range aggregation got no documents on one shard or more and several
   documents on one shard or more.
the range aggregation could return incorrect counts and sub aggregations.

The root cause is that since the reduce happens in-place and since the range
aggregation uses the same instance for all sub-aggregation in case of an
empty bucket, sometimes non-empty buckets would have been reduced into this
shared instance.

In order to avoid similar bugs in the future, aggregations have been updated
to return a new instance when reducing instead of doing it in-place.

Close #6435
This commit is contained in:
Adrien Grand 2014-06-12 14:54:08 +02:00
parent 52be3748ff
commit fbd7c9aa5d
38 changed files with 385 additions and 656 deletions

View File

@ -61,12 +61,6 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
this.aggregations = aggregations;
}
/** Resets the internal addAggregation */
void reset(List<InternalAggregation> aggregations) {
this.aggregations = aggregations;
this.aggregationsAsMap = null;
}
/**
* Iterates over the {@link Aggregation}s.
*/
@ -145,20 +139,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
}
InternalAggregations result = aggregationsList.get(0);
result.reset(reducedAggregations);
return result;
}
/**
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
* @param cacheRecycler
*/
public void reduce(BigArrays bigArrays) {
for (int i = 0; i < aggregations.size(); i++) {
InternalAggregation aggregation = aggregations.get(i);
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays)));
}
return new InternalAggregations(reducedAggregations);
}
/** The fields required to write this addAggregation to xcontent */

View File

@ -33,8 +33,8 @@ import java.util.List;
*/
public abstract class InternalSingleBucketAggregation extends InternalAggregation implements SingleBucketAggregation {
protected long docCount;
protected InternalAggregations aggregations;
private long docCount;
private InternalAggregations aggregations;
protected InternalSingleBucketAggregation() {} // for serialization
@ -61,26 +61,23 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
return aggregations;
}
/**
* Create a <b>new</b> empty sub aggregation. This must be a new instance on each call.
*/
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0));
reduced.aggregations.reduce(reduceContext.bigArrays());
return reduced;
}
InternalSingleBucketAggregation reduced = null;
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalSingleBucketAggregation) aggregation;
} else {
this.docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
}
assert aggregation.getName().equals(getName());
docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
}
reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
return reduced;
final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
return newAggregation(getName(), docCount, aggs);
}
@Override

View File

@ -56,4 +56,8 @@ public class InternalFilter extends InternalSingleBucketAggregation implements F
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalFilter(name, docCount, subAggregations);
}
}

View File

@ -106,24 +106,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
}
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) {
// we still need to reduce the sub aggs
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(bigArrays);
return bucket;
}
Bucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
long docCount = 0;
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = bucket;
} else {
reduced.docCount += bucket.docCount;
}
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced;
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
return new Bucket(geohashAsLong, docCount, aggs);
}
@Override
@ -181,19 +171,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
@Override
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
grid.reduceAndTrimBuckets(reduceContext.bigArrays());
return grid;
}
InternalGeoHashGrid reduced = null;
LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
if (reduced == null) {
reduced = grid;
}
if (buckets == null) {
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
}
@ -207,12 +188,6 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
}
}
if (reduced == null) {
// there are only unmapped terms, so we just return the first one (no need to reduce)
return (InternalGeoHashGrid) aggregations.get(0);
}
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
@ -224,28 +199,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
reduced.buckets = Arrays.asList(list);
return reduced;
}
protected void reduceAndTrimBuckets(BigArrays bigArrays) {
if (requiredSize > buckets.size()) { // nothing to trim
for (Bucket bucket : buckets) {
bucket.aggregations.reduce(bigArrays);
}
return;
}
List<Bucket> trimmedBuckets = new ArrayList<>(requiredSize);
for (Bucket bucket : buckets) {
if (trimmedBuckets.size() >= requiredSize) {
break;
}
bucket.aggregations.reduce(bigArrays);
trimmedBuckets.add(bucket);
}
buckets = trimmedBuckets;
return new InternalGeoHashGrid(getName(), requiredSize, Arrays.asList(list));
}
@Override

View File

@ -56,4 +56,9 @@ public class InternalGlobal extends InternalSingleBucketAggregation implements G
public Type type() {
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalGlobal(name, docCount, subAggregations);
}
}

View File

@ -57,6 +57,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
super(key, docCount, formatter, aggregations);
}
@Override
protected InternalHistogram.Factory<Bucket> getFactory() {
return FACTORY;
}
@Override
public String getKey() {
return formatter != null ? formatter.format(key) : ValueFormatter.DateTime.DEFAULT.format(key);
@ -109,6 +114,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
return TYPE;
}
@Override
protected InternalHistogram.Factory<Bucket> getFactory() {
return FACTORY;
}
@Override
public Bucket getBucketByKey(String key) {
try {

View File

@ -67,10 +67,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
public static class Bucket implements Histogram.Bucket {
long key;
long docCount;
final long key;
final long docCount;
protected transient final @Nullable ValueFormatter formatter;
InternalAggregations aggregations;
final InternalAggregations aggregations;
public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
this.key = key;
@ -79,6 +79,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
this.aggregations = aggregations;
}
protected Factory<?> getFactory() {
return FACTORY;
}
@Override
public String getKey() {
return formatter != null ? formatter.format(key) : ValueFormatter.RAW.format(key);
@ -105,24 +109,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
}
<B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) {
// we only need to reduce the sub aggregations
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(bigArrays);
return (B) bucket;
}
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
Bucket reduced = null;
long docCount = 0;
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = bucket;
} else {
reduced.docCount += bucket.docCount;
}
docCount += bucket.docCount;
aggregations.add((InternalAggregations) bucket.getAggregations());
}
reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays);
return (B) reduced;
InternalAggregations aggs = InternalAggregations.reduce(aggregations, bigArrays);
return (B) getFactory().createBucket(key, docCount, aggs, formatter);
}
void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException {
@ -256,98 +250,13 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return bucketsMap.get(key.longValue());
}
protected Factory<B> getFactory() {
return FACTORY;
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalHistogram<B> histo = (InternalHistogram<B>) aggregations.get(0);
if (minDocCount == 1) {
for (B bucket : histo.buckets) {
bucket.aggregations.reduce(reduceContext.bigArrays());
}
return histo;
}
CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
List<B> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
B lastBucket = null;
ListIterator<B> iter = list.listIterator();
// we need to fill the gaps with empty buckets
if (minDocCount == 0) {
ExtendedBounds bounds = emptyBucketInfo.bounds;
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
if (bounds != null) {
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
if (bounds.min != null && bounds.max != null) {
long key = bounds.min;
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
} else {
if (bounds.min != null) {
long key = bounds.min;
while (key < firstBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
}
}
// now adding the empty buckets within the actual data,
// e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6
while (iter.hasNext()) {
// look ahead on the next bucket without advancing the iter
// so we'll be able to insert elements at the right position
B nextBucket = list.get(iter.nextIndex());
nextBucket.aggregations.reduce(reduceContext.bigArrays());
if (lastBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
while (key != nextBucket.key) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
lastBucket = iter.next();
}
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
while (key <= max) {
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
} else {
while (iter.hasNext()) {
InternalHistogram.Bucket bucket = iter.next();
if (bucket.getDocCount() < minDocCount) {
iter.remove();
} else {
bucket.aggregations.reduce(reduceContext.bigArrays());
}
}
}
if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) {
CollectionUtil.introSort(histo.buckets, order.comparator());
}
return histo;
}
InternalHistogram reduced = (InternalHistogram) aggregations.get(0);
LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays());
for (InternalAggregation aggregation : aggregations) {
@ -437,9 +346,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
reduced.buckets = reducedBuckets;
return reduced;
return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed);
}
protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {

View File

@ -58,4 +58,8 @@ public class InternalMissing extends InternalSingleBucketAggregation implements
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalMissing(name, docCount, subAggregations);
}
}

View File

@ -57,4 +57,8 @@ public class InternalNested extends InternalSingleBucketAggregation implements N
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalNested(name, docCount, subAggregations);
}
}

View File

@ -57,4 +57,8 @@ public class InternalReverseNested extends InternalSingleBucketAggregation imple
return TYPE;
}
@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalReverseNested(name, docCount, subAggregations);
}
}

View File

@ -60,11 +60,12 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
public static class Bucket implements Range.Bucket {
private double from = Double.NEGATIVE_INFINITY;
private double to = Double.POSITIVE_INFINITY;
private long docCount;
InternalAggregations aggregations;
private String key;
private final ValueFormatter formatter;
private final double from;
private final double to;
private final long docCount;
final InternalAggregations aggregations;
private final String key;
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
this.key = key != null ? key : generateKey(from, to, formatter);
@ -72,6 +73,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
this.to = to;
this.docCount = docCount;
this.aggregations = aggregations;
this.formatter = formatter;
}
public String getKey() {
@ -103,25 +105,19 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
return aggregations;
}
protected Factory<? extends Bucket, ?> getFactory() {
return FACTORY;
}
Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
if (ranges.size() == 1) {
// we stil need to call reduce on all the sub aggregations
Bucket bucket = ranges.get(0);
bucket.aggregations.reduce(bigArrays);
return bucket;
}
Bucket reduced = null;
long docCount = 0;
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
for (Bucket range : ranges) {
if (reduced == null) {
reduced = range;
} else {
reduced.docCount += range.docCount;
}
docCount += range.docCount;
aggregationsList.add(range.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced;
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
return getFactory().createBucket(key, from, to, docCount, aggs, formatter);
}
void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException {
@ -164,8 +160,8 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
return TYPE.name();
}
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
return (R) new InternalRange<>(name, ranges, formatter, keyed, unmapped);
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
return (R) new InternalRange<>(name, ranges, formatter, keyed);
}
@ -178,16 +174,14 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
private Map<String, B> rangeMap;
private @Nullable ValueFormatter formatter;
private boolean keyed;
private boolean unmapped;
public InternalRange() {} // for serialization
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
super(name);
this.ranges = ranges;
this.formatter = formatter;
this.keyed = keyed;
this.unmapped = unmapped;
}
@Override
@ -211,52 +205,31 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
return rangeMap.get(key);
}
protected Factory<B, ?> getFactory() {
return FACTORY;
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalRange<B> reduced = (InternalRange<B>) aggregations.get(0);
for (B bucket : reduced.ranges) {
bucket.aggregations.reduce(reduceContext.bigArrays());
}
return reduced;
@SuppressWarnings("unchecked")
List<Bucket>[] rangeList = new List[ranges.size()];
for (int i = 0; i < rangeList.length; ++i) {
rangeList[i] = new ArrayList<Bucket>();
}
List<List<Bucket>> rangesList = null;
for (InternalAggregation aggregation : aggregations) {
InternalRange<Bucket> ranges = (InternalRange) aggregation;
if (ranges.unmapped) {
continue;
}
if (rangesList == null) {
rangesList = new ArrayList<>(ranges.ranges.size());
for (Bucket bucket : ranges.ranges) {
List<Bucket> sameRangeList = new ArrayList<>(aggregations.size());
sameRangeList.add(bucket);
rangesList.add(sameRangeList);
}
} else {
int i = 0;
for (Bucket range : ranges.ranges) {
rangesList.get(i++).add(range);
}
InternalRange<?> ranges = (InternalRange<?>) aggregation;
int i = 0;
for (Bucket range : ranges.ranges) {
rangeList[i++].add(range);
}
}
if (rangesList == null) {
// unmapped, we can just take the first one
return aggregations.get(0);
final List<B> ranges = new ArrayList<>();
for (int i = 0; i < this.ranges.size(); ++i) {
ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext.bigArrays()));
}
InternalRange reduced = (InternalRange) aggregations.get(0);
int i = 0;
for (List<Bucket> sameRangeList : rangesList) {
reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays()));
}
return reduced;
}
protected B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
return (B) new Bucket(key, from, to, docCount, aggregations, formatter);
return getFactory().create(name, ranges, formatter, keyed);
}
@Override
@ -268,7 +241,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
List<B> ranges = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
String key = in.readOptionalString();
ranges.add(createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
}
this.ranges = ranges;
this.rangeMap = null;

View File

@ -203,7 +203,7 @@ public class RangeAggregator extends BucketsAggregator {
buckets.add(bucket);
}
// value source can be null in the case of unmapped fields
return rangeFactory.create(name, buckets, formatter, keyed, false);
return rangeFactory.create(name, buckets, formatter, keyed);
}
@Override
@ -217,7 +217,7 @@ public class RangeAggregator extends BucketsAggregator {
buckets.add(bucket);
}
// value source can be null in the case of unmapped fields
return rangeFactory.create(name, buckets, formatter, keyed, false);
return rangeFactory.create(name, buckets, formatter, keyed);
}
private static final void sortRanges(final Range[] ranges) {
@ -274,7 +274,7 @@ public class RangeAggregator extends BucketsAggregator {
for (RangeAggregator.Range range : ranges) {
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter));
}
return factory.create(name, buckets, formatter, keyed, true);
return factory.create(name, buckets, formatter, keyed);
}
}

View File

@ -72,6 +72,11 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
public DateTime getToAsDate() {
return Double.isInfinite(getTo().doubleValue()) ? null : new DateTime(getTo().longValue(), DateTimeZone.UTC);
}
@Override
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}
private static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
@ -82,8 +87,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
}
@Override
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed, boolean unmapped) {
return new InternalDateRange(name, ranges, formatter, keyed, unmapped);
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed) {
return new InternalDateRange(name, ranges, formatter, keyed);
}
@Override
@ -94,8 +99,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
InternalDateRange() {} // for serialization
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
super(name, ranges, formatter, keyed, unmapped);
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
super(name, ranges, formatter, keyed);
}
@Override
@ -104,7 +109,7 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
}
@Override
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, formatter);
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}

View File

@ -61,6 +61,10 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
super(key, from, to, docCount, aggregations, formatter);
}
@Override
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}
private static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
@ -71,8 +75,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
@Override
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
return new InternalGeoDistance(name, ranges, formatter, keyed, unmapped);
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
return new InternalGeoDistance(name, ranges, formatter, keyed);
}
@Override
@ -83,8 +87,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
InternalGeoDistance() {} // for serialization
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
super(name, ranges, formatter, keyed, unmapped);
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
super(name, ranges, formatter, keyed);
}
@Override
@ -93,7 +97,7 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
}
@Override
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
return new Bucket(key, from, to, docCount, aggregations, formatter);
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}

View File

@ -74,6 +74,11 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
double to = getTo().doubleValue();
return Double.isInfinite(to) ? null : MAX_IP == to ? null : ValueFormatter.IPv4.format(to);
}
@Override
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}
private static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> {
@ -84,8 +89,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
}
@Override
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
return new InternalIPv4Range(name, ranges, keyed, unmapped);
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
return new InternalIPv4Range(name, ranges, keyed);
}
@Override
@ -96,8 +101,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
public InternalIPv4Range() {} // for serialization
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed, boolean unmapped) {
super(name, ranges, ValueFormatter.IPv4, keyed, unmapped);
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed) {
super(name, ranges, ValueFormatter.IPv4, keyed);
}
@Override
@ -106,8 +111,7 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
}
@Override
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter ) {
return new Bucket(key, from, to, docCount, aggregations);
protected InternalRange.Factory<Bucket, ?> getFactory() {
return FACTORY;
}
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
@ -143,25 +142,20 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
}
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) {
return buckets.get(0);
}
Bucket reduced = null;
long subsetDf = 0;
long supersetDf = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = bucket;
} else {
reduced.subsetDf += bucket.subsetDf;
reduced.supersetDf += bucket.supersetDf;
reduced.updateScore();
}
subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf;
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced;
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs);
}
abstract Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations);
@Override
public double getSignificanceScore() {
return score;
@ -201,14 +195,8 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
}
@Override
public InternalSignificantTerms reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalSignificantTerms terms = (InternalSignificantTerms) aggregations.get(0);
terms.trimExcessEntries();
return terms;
}
InternalSignificantTerms reduced = null;
long globalSubsetSize = 0;
long globalSupersetSize = 0;
@ -219,18 +207,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
globalSubsetSize += terms.subsetSize;
globalSupersetSize += terms.supersetSize;
}
Map<String, List<InternalSignificantTerms.Bucket>> buckets = null;
Map<String, List<InternalSignificantTerms.Bucket>> buckets = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
InternalSignificantTerms terms = (InternalSignificantTerms) aggregation;
if (terms instanceof UnmappedSignificantTerms) {
continue;
}
if (reduced == null) {
reduced = terms;
}
if (buckets == null) {
buckets = new HashMap<>(terms.buckets.size());
}
for (Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.get(bucket.getKey());
if (existingBuckets == null) {
@ -239,19 +218,10 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
}
// Adjust the buckets with the global stats representing the
// total size of the pots from which the stats are drawn
bucket.subsetSize = globalSubsetSize;
bucket.supersetSize = globalSupersetSize;
bucket.updateScore();
existingBuckets.add(bucket);
existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, bucket.aggregations));
}
}
if (reduced == null) {
// there are only unmapped terms, so we just return the first one
// (no need to reduce)
return (UnmappedSignificantTerms) aggregations.get(0);
}
final int size = Math.min(requiredSize, buckets.size());
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) {
@ -265,23 +235,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (Bucket) ordered.pop();
}
reduced.buckets = Arrays.asList(list);
reduced.subsetSize = globalSubsetSize;
reduced.supersetSize = globalSupersetSize;
return reduced;
return newAggregation(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
}
final void trimExcessEntries() {
final List<Bucket> newBuckets = Lists.newArrayList();
for (Bucket b : buckets) {
if (newBuckets.size() >= requiredSize) {
break;
}
if (b.subsetDf >= minDocCount) {
newBuckets.add(b);
}
}
buckets = newBuckets;
}
abstract InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets);
}

View File

@ -83,6 +83,11 @@ public class SignificantLongTerms extends InternalSignificantTerms {
return Long.toString(term);
}
@Override
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations);
}
}
private ValueFormatter formatter;
@ -101,6 +106,12 @@ public class SignificantLongTerms extends InternalSignificantTerms {
return TYPE;
}
@Override
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
List<InternalSignificantTerms.Bucket> buckets) {
return new SignificantLongTerms(subsetSize, supersetSize, getName(), formatter, requiredSize, supersetSize, buckets);
}
@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();

View File

@ -85,6 +85,10 @@ public class SignificantStringTerms extends InternalSignificantTerms {
return termBytes.utf8ToString();
}
@Override
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
}
}
SignificantStringTerms() {} // for serialization
@ -99,6 +103,12 @@ public class SignificantStringTerms extends InternalSignificantTerms {
return TYPE;
}
@Override
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
List<InternalSignificantTerms.Bucket> buckets) {
return new SignificantStringTerms(subsetSize, supersetSize, getName(), requiredSize, supersetSize, buckets);
}
@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();

View File

@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -64,6 +66,21 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
return TYPE;
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation aggregation : reduceContext.aggregations()) {
if (!(aggregation instanceof UnmappedSignificantTerms)) {
return aggregation.reduce(reduceContext);
}
}
return this;
}
@Override
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets) {
throw new UnsupportedOperationException("How did you get there?");
}
@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();

View File

@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.DoubleObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -87,6 +83,15 @@ public class DoubleTerms extends InternalTerms {
return Double.compare(term, other.getKeyAsNumber().doubleValue());
}
@Override
Object getKeyAsObject() {
return getKeyAsNumber();
}
@Override
Bucket newBucket(long docCount, InternalAggregations aggs) {
return new Bucket(term, docCount, aggs);
}
}
private @Nullable ValueFormatter formatter;
@ -104,59 +109,8 @@ public class DoubleTerms extends InternalTerms {
}
@Override
public InternalTerms reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.bigArrays());
return terms;
}
InternalTerms reduced = null;
DoubleObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation;
if (terms instanceof UnmappedTerms) {
continue;
}
if (reduced == null) {
reduced = terms;
}
if (buckets == null) {
buckets = new DoubleObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
}
for (Terms.Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(((Bucket) bucket).term, existingBuckets);
}
existingBuckets.add((Bucket) bucket);
}
}
if (reduced == null) {
// there are only unmapped terms, so we just return the first one (no need to reduce)
return (UnmappedTerms) aggregations.get(0);
}
// TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ?
final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
for (DoubleObjectPagedHashMap.Cursor<List<DoubleTerms.Bucket>> cursor : buckets) {
List<DoubleTerms.Bucket> sameTermBuckets = cursor.value;
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (b.getDocCount() >= minDocCount) {
ordered.insertWithOverflow(b);
}
}
buckets.close();
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (Bucket) ordered.pop();
}
reduced.buckets = Arrays.asList(list);
return reduced;
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
return new DoubleTerms(name, order, formatter, requiredSize, minDocCount, buckets);
}
@Override

View File

@ -18,10 +18,10 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms;
import com.google.common.collect.Lists;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.Aggregations;
@ -58,24 +58,19 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
return aggregations;
}
abstract Object getKeyAsObject();
abstract Bucket newBucket(long docCount, InternalAggregations aggs);
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
if (buckets.size() == 1) {
Bucket bucket = buckets.get(0);
bucket.aggregations.reduce(bigArrays);
return bucket;
}
Bucket reduced = null;
long docCount = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (Bucket bucket : buckets) {
if (reduced == null) {
reduced = bucket;
} else {
reduced.docCount += bucket.docCount;
}
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
return reduced;
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
return newBucket(docCount, aggs);
}
}
@ -113,47 +108,21 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
}
@Override
public InternalTerms reduce(ReduceContext reduceContext) {
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.bigArrays());
return terms;
}
InternalTerms reduced = null;
Map<Text, List<InternalTerms.Bucket>> buckets = null;
Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation;
if (terms instanceof UnmappedTerms) {
continue;
}
if (reduced == null) {
reduced = terms;
}
if (buckets == null) {
buckets = new HashMap<>(terms.buckets.size());
}
for (Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.get(bucket.getKeyAsText());
if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(bucket.getKeyAsText(), existingBuckets);
}
existingBuckets.add(bucket);
buckets.put(bucket.getKeyAsObject(), bucket);
}
}
if (reduced == null) {
// there are only unmapped terms, so we just return the first one (no need to reduce)
return (UnmappedTerms) aggregations.get(0);
}
final int size = Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
for (Map.Entry<Text, List<Bucket>> entry : buckets.entrySet()) {
List<Bucket> sameTermBuckets = entry.getValue();
for (Collection<Bucket> l : buckets.asMap().values()) {
List<Bucket> sameTermBuckets = (List<Bucket>) l; // cast is ok according to javadocs
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (b.docCount >= minDocCount) {
ordered.insertWithOverflow(b);
@ -163,22 +132,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (Bucket) ordered.pop();
}
reduced.buckets = Arrays.asList(list);
return reduced;
return newAggregation(name, Arrays.asList(list));
}
final void trimExcessEntries(BigArrays bigArrays) {
final List<Bucket> newBuckets = Lists.newArrayList();
for (Bucket b : buckets) {
if (newBuckets.size() >= requiredSize) {
break;
}
if (b.docCount >= minDocCount) {
newBuckets.add(b);
b.aggregations.reduce(bigArrays);
}
}
buckets = newBuckets;
}
protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets);
}

View File

@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -87,6 +83,16 @@ public class LongTerms extends InternalTerms {
int compareTerm(Terms.Bucket other) {
return Long.compare(term, other.getKeyAsNumber().longValue());
}
@Override
Object getKeyAsObject() {
return getKeyAsNumber();
}
@Override
Bucket newBucket(long docCount, InternalAggregations aggs) {
return new Bucket(term, docCount, aggs);
}
}
private @Nullable ValueFormatter formatter;
@ -104,59 +110,8 @@ public class LongTerms extends InternalTerms {
}
@Override
public InternalTerms reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
InternalTerms terms = (InternalTerms) aggregations.get(0);
terms.trimExcessEntries(reduceContext.bigArrays());
return terms;
}
InternalTerms reduced = null;
LongObjectPagedHashMap<List<Bucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalTerms terms = (InternalTerms) aggregation;
if (terms instanceof UnmappedTerms) {
continue;
}
if (reduced == null) {
reduced = terms;
}
if (buckets == null) {
buckets = new LongObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
}
for (Terms.Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(((Bucket) bucket).term, existingBuckets);
}
existingBuckets.add((Bucket) bucket);
}
}
if (reduced == null) {
// there are only unmapped terms, so we just return the first one (no need to reduce)
return (UnmappedTerms) aggregations.get(0);
}
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
final int size = (int) Math.min(requiredSize, buckets.size());
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
for (LongObjectPagedHashMap.Cursor<List<LongTerms.Bucket>> cursor : buckets) {
List<LongTerms.Bucket> sameTermBuckets = cursor.value;
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
if (b.getDocCount() >= minDocCount) {
ordered.insertWithOverflow(b);
}
}
buckets.close();
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (Bucket) ordered.pop();
}
reduced.buckets = Arrays.asList(list);
return reduced;
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
return new LongTerms(name, order, formatter, requiredSize, minDocCount, buckets);
}
@Override

View File

@ -84,6 +84,16 @@ public class StringTerms extends InternalTerms {
int compareTerm(Terms.Bucket other) {
return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes);
}
@Override
Object getKeyAsObject() {
return getKeyAsText();
}
@Override
Bucket newBucket(long docCount, InternalAggregations aggs) {
return new Bucket(termBytes, docCount, aggs);
}
}
StringTerms() {} // for serialization
@ -97,6 +107,11 @@ public class StringTerms extends InternalTerms {
return TYPE;
}
@Override
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
return new StringTerms(name, order, requiredSize, minDocCount, buckets);
}
@Override
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();

View File

@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -80,6 +82,21 @@ public class UnmappedTerms extends InternalTerms {
out.writeVLong(minDocCount);
}
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
for (InternalAggregation agg : reduceContext.aggregations()) {
if (!(agg instanceof UnmappedTerms)) {
return agg.reduce(reduceContext);
}
}
return this;
}
@Override
protected InternalTerms newAggregation(String name, List<Bucket> buckets) {
throw new UnsupportedOperationException("How did you get there?");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
/**
*
@ -76,20 +75,13 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
@Override
public InternalAvg reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (InternalAvg) aggregations.get(0);
long count = 0;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
count += ((InternalAvg) aggregation).count;
sum += ((InternalAvg) aggregation).sum;
}
InternalAvg reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalAvg) aggregation;
} else {
reduced.count += ((InternalAvg) aggregation).count;
reduced.sum += ((InternalAvg) aggregation).sum;
}
}
return reduced;
return new InternalAvg(getName(), sum, count);
}
@Override

View File

@ -184,6 +184,10 @@ public final class HyperLogLogPlusPlus implements Releasable {
alphaMM = alpha * m * m;
}
public int precision() {
return p;
}
public long maxBucket() {
return runLens.size() >>> p;
}

View File

@ -104,18 +104,17 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
final InternalCardinality cardinality = (InternalCardinality) aggregation;
if (cardinality.counts != null) {
if (reduced == null) {
reduced = cardinality;
} else {
reduced.merge(cardinality);
reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), BigArrays.NON_RECYCLING_INSTANCE, 1));
}
reduced.merge(cardinality);
}
}
if (reduced == null) { // all empty
reduced = (InternalCardinality) aggregations.get(0);
return aggregations.get(0);
} else {
return reduced;
}
return reduced;
}
public void merge(InternalCardinality other) {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import java.io.IOException;
import java.util.List;
public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds {
@ -72,36 +71,36 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
InternalGeoBounds reduced = null;
List<InternalAggregation> aggregations = reduceContext.aggregations();
for (InternalAggregation aggregation : aggregations) {
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
if (reduced == null) {
reduced = bounds;
continue;
}
double top = Double.NEGATIVE_INFINITY;
double bottom = Double.POSITIVE_INFINITY;
double posLeft = Double.POSITIVE_INFINITY;
double posRight = Double.NEGATIVE_INFINITY;
double negLeft = Double.POSITIVE_INFINITY;
double negRight = Double.NEGATIVE_INFINITY;
if (bounds.top > reduced.top) {
reduced.top = bounds.top;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
if (bounds.top > top) {
top = bounds.top;
}
if (bounds.bottom < reduced.bottom) {
reduced.bottom = bounds.bottom;
if (bounds.bottom < bottom) {
bottom = bounds.bottom;
}
if (bounds.posLeft < reduced.posLeft) {
reduced.posLeft = bounds.posLeft;
if (bounds.posLeft < posLeft) {
posLeft = bounds.posLeft;
}
if (bounds.posRight > reduced.posRight) {
reduced.posRight = bounds.posRight;
if (bounds.posRight > posRight) {
posRight = bounds.posRight;
}
if (bounds.negLeft < reduced.negLeft) {
reduced.negLeft = bounds.negLeft;
if (bounds.negLeft < negLeft) {
negLeft = bounds.negLeft;
}
if (bounds.negRight > reduced.negRight) {
reduced.negRight = bounds.negRight;
if (bounds.negRight > negRight) {
negRight = bounds.negRight;
}
}
return reduced;
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude);
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
/**
*
@ -74,22 +73,11 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
@Override
public InternalMax reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (InternalMax) aggregations.get(0);
double max = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
max = Math.max(max, ((InternalMax) aggregation).max);
}
InternalMax reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalMax) aggregation;
} else {
reduced.max = Math.max(reduced.max, ((InternalMax) aggregation).max);
}
}
if (reduced != null) {
return reduced;
}
return (InternalMax) aggregations.get(0);
return new InternalMax(name, max);
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
/**
*
@ -75,22 +74,11 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
@Override
public InternalMin reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (InternalMin) aggregations.get(0);
double min = Double.POSITIVE_INFINITY;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
min = Math.min(min, ((InternalMin) aggregation).min);
}
InternalMin reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalMin) aggregation;
} else {
reduced.min = Math.min(reduced.min, ((InternalMin) aggregation).min);
}
}
if (reduced != null) {
return reduced;
}
return (InternalMin) aggregations.get(0);
return new InternalMin(getName(), min);
}
@Override

View File

@ -32,7 +32,6 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
*
@ -89,17 +88,15 @@ public class InternalPercentiles extends InternalNumericMetricsAggregation.Multi
@Override
public InternalPercentiles reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
InternalPercentiles merged = null;
for (InternalAggregation aggregation : aggregations) {
TDigestState merged = null;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
if (merged == null) {
merged = percentiles;
} else {
merged.state.add(percentiles.state);
merged = new TDigestState(percentiles.state.compression());
}
merged.add(percentiles.state);
}
return merged;
return new InternalPercentiles(getName(), percents, merged, keyed);
}
@Override

View File

@ -36,6 +36,10 @@ public class TDigestState extends AVLTreeDigest {
this.compression = compression;
}
public double compression() {
return compression;
}
public static void write(TDigestState state, StreamOutput out) throws IOException {
out.writeDouble(state.compression);
out.writeVInt(state.centroidCount());

View File

@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
/**
*
@ -120,33 +119,18 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
@Override
public InternalStats reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (InternalStats) aggregations.get(0);
long count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
InternalStats stats = (InternalStats) aggregation;
count += stats.getCount();
min = Math.min(min, stats.getMin());
max = Math.max(max, stats.getMax());
sum += stats.getSum();
}
InternalStats reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
if (((InternalStats) aggregation).count != 0) {
reduced = (InternalStats) aggregation;
}
} else {
if (((InternalStats) aggregation).count != 0) {
reduced.count += ((InternalStats) aggregation).count;
reduced.min = Math.min(reduced.min, ((InternalStats) aggregation).min);
reduced.max = Math.max(reduced.max, ((InternalStats) aggregation).max);
reduced.sum += ((InternalStats) aggregation).sum;
mergeOtherStats(reduced, aggregation);
}
}
}
if (reduced != null) {
return reduced;
}
return (InternalStats) aggregations.get(0);
}
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
return new InternalStats(name, count, sum, min, max);
}
@Override

View File

@ -101,8 +101,14 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
@Override
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
((InternalExtendedStats) to).sumOfSqrs += ((InternalExtendedStats) from).sumOfSqrs;
public InternalExtendedStats reduce(ReduceContext reduceContext) {
double sumOfSqrs = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
InternalExtendedStats stats = (InternalExtendedStats) aggregation;
sumOfSqrs += stats.getSumOfSquares();
}
final InternalStats stats = super.reduce(reduceContext);
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs);
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.List;
/**
*
@ -74,22 +73,11 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
@Override
public InternalSum reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return (InternalSum) aggregations.get(0);
double sum = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
sum += ((InternalSum) aggregation).sum;
}
InternalSum reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalSum) aggregation;
} else {
reduced.sum += ((InternalSum) aggregation).sum;
}
}
if (reduced != null) {
return reduced;
}
return (InternalSum) aggregations.get(0);
return new InternalSum(name, sum);
}
@Override

View File

@ -94,10 +94,6 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1 && from == 0) {
return aggregations.get(0);
}
TopDocs[] shardDocs = new TopDocs[aggregations.size()];
InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()];
for (int i = 0; i < shardDocs.length; i++) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import java.io.IOException;
import java.util.List;
/**
* An internal implementation of {@link ValueCount}.
@ -69,19 +68,11 @@ public class InternalValueCount extends InternalNumericMetricsAggregation implem
@Override
public InternalAggregation reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations();
if (aggregations.size() == 1) {
return aggregations.get(0);
long valueCount = 0;
for (InternalAggregation aggregation : reduceContext.aggregations()) {
valueCount += ((InternalValueCount) aggregation).value;
}
InternalValueCount reduced = null;
for (InternalAggregation aggregation : aggregations) {
if (reduced == null) {
reduced = (InternalValueCount) aggregation;
} else {
reduced.value += ((InternalValueCount) aggregation).value;
}
}
return reduced;
return new InternalValueCount(name, valueCount);
}
@Override

View File

@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.util.List;
@ -310,4 +311,49 @@ public class RandomTests extends ElasticsearchIntegrationTest {
assertEquals(numDocs, response.getHits().getTotalHits());
}
// https://github.com/elasticsearch/elasticsearch/issues/6435
public void testReduce() throws Exception {
createIndex("idx");
final int value = randomIntBetween(0, 10);
indexRandom(true, client().prepareIndex("idx", "type").setSource("f", value));
SearchResponse response = client().prepareSearch("idx")
.addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter())
.subAggregation(range("range")
.field("f")
.addUnboundedTo(6)
.addUnboundedFrom(6)
.subAggregation(sum("sum").field("f"))))
.execute().actionGet();
assertSearchResponse(response);System.out.println(response);
Filter filter = response.getAggregations().get("filter");
assertNotNull(filter);
assertEquals(1, filter.getDocCount());
Range range = filter.getAggregations().get("range");
assertThat(range, notNullValue());
assertThat(range.getName(), equalTo("range"));
assertThat(range.getBuckets().size(), equalTo(2));
Range.Bucket bucket = range.getBucketByKey("*-6.0");
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("*-6.0"));
assertThat(bucket.getFrom().doubleValue(), equalTo(Double.NEGATIVE_INFINITY));
assertThat(bucket.getTo().doubleValue(), equalTo(6.0));
assertThat(bucket.getDocCount(), equalTo(value < 6 ? 1L : 0L));
Sum sum = bucket.getAggregations().get("sum");
assertEquals(value < 6 ? value : 0, sum.getValue(), 0d);
bucket = range.getBucketByKey("6.0-*");
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("6.0-*"));
assertThat(bucket.getFrom().doubleValue(), equalTo(6.0));
assertThat(bucket.getTo().doubleValue(), equalTo(Double.POSITIVE_INFINITY));
assertThat(bucket.getDocCount(), equalTo(value >= 6 ? 1L : 0L));
sum = bucket.getAggregations().get("sum");
assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d);
}
}