mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-07 21:48:39 +00:00
Aggregations: Fix reducing of range aggregations.
Under some rare circumstances: - local transport, - the range aggregation has both a parent and a child aggregation, - the range aggregation got no documents on one shard or more and several documents on one shard or more. the range aggregation could return incorrect counts and sub aggregations. The root cause is that since the reduce happens in-place and since the range aggregation uses the same instance for all sub-aggregation in case of an empty bucket, sometimes non-empty buckets would have been reduced into this shared instance. In order to avoid similar bugs in the future, aggregations have been updated to return a new instance when reducing instead of doing it in-place. Close #6435
This commit is contained in:
parent
52be3748ff
commit
fbd7c9aa5d
@ -61,12 +61,6 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||||||
this.aggregations = aggregations;
|
this.aggregations = aggregations;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Resets the internal addAggregation */
|
|
||||||
void reset(List<InternalAggregation> aggregations) {
|
|
||||||
this.aggregations = aggregations;
|
|
||||||
this.aggregationsAsMap = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterates over the {@link Aggregation}s.
|
* Iterates over the {@link Aggregation}s.
|
||||||
*/
|
*/
|
||||||
@ -145,20 +139,7 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
|
|||||||
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
|
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
|
||||||
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
|
reducedAggregations.add(first.reduce(new InternalAggregation.ReduceContext(aggregations, bigArrays)));
|
||||||
}
|
}
|
||||||
InternalAggregations result = aggregationsList.get(0);
|
return new InternalAggregations(reducedAggregations);
|
||||||
result.reset(reducedAggregations);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reduces this aggregations, effectively propagates the reduce to all the sub aggregations
|
|
||||||
* @param cacheRecycler
|
|
||||||
*/
|
|
||||||
public void reduce(BigArrays bigArrays) {
|
|
||||||
for (int i = 0; i < aggregations.size(); i++) {
|
|
||||||
InternalAggregation aggregation = aggregations.get(i);
|
|
||||||
aggregations.set(i, aggregation.reduce(new InternalAggregation.ReduceContext(ImmutableList.of(aggregation), bigArrays)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The fields required to write this addAggregation to xcontent */
|
/** The fields required to write this addAggregation to xcontent */
|
||||||
|
@ -33,8 +33,8 @@ import java.util.List;
|
|||||||
*/
|
*/
|
||||||
public abstract class InternalSingleBucketAggregation extends InternalAggregation implements SingleBucketAggregation {
|
public abstract class InternalSingleBucketAggregation extends InternalAggregation implements SingleBucketAggregation {
|
||||||
|
|
||||||
protected long docCount;
|
private long docCount;
|
||||||
protected InternalAggregations aggregations;
|
private InternalAggregations aggregations;
|
||||||
|
|
||||||
protected InternalSingleBucketAggregation() {} // for serialization
|
protected InternalSingleBucketAggregation() {} // for serialization
|
||||||
|
|
||||||
@ -61,26 +61,23 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
|
|||||||
return aggregations;
|
return aggregations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a <b>new</b> empty sub aggregation. This must be a new instance on each call.
|
||||||
|
*/
|
||||||
|
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
long docCount = 0L;
|
||||||
InternalSingleBucketAggregation reduced = ((InternalSingleBucketAggregation) aggregations.get(0));
|
|
||||||
reduced.aggregations.reduce(reduceContext.bigArrays());
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
InternalSingleBucketAggregation reduced = null;
|
|
||||||
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
|
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
if (reduced == null) {
|
assert aggregation.getName().equals(getName());
|
||||||
reduced = (InternalSingleBucketAggregation) aggregation;
|
docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
|
||||||
} else {
|
|
||||||
this.docCount += ((InternalSingleBucketAggregation) aggregation).docCount;
|
|
||||||
}
|
|
||||||
subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
|
subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).aggregations);
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
|
final InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext.bigArrays());
|
||||||
return reduced;
|
return newAggregation(getName(), docCount, aggs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -56,4 +56,8 @@ public class InternalFilter extends InternalSingleBucketAggregation implements F
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||||
|
return new InternalFilter(name, docCount, subAggregations);
|
||||||
|
}
|
||||||
}
|
}
|
@ -106,24 +106,14 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||||
if (buckets.size() == 1) {
|
|
||||||
// we still need to reduce the sub aggs
|
|
||||||
Bucket bucket = buckets.get(0);
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
return bucket;
|
|
||||||
}
|
|
||||||
Bucket reduced = null;
|
|
||||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||||
|
long docCount = 0;
|
||||||
for (Bucket bucket : buckets) {
|
for (Bucket bucket : buckets) {
|
||||||
if (reduced == null) {
|
docCount += bucket.docCount;
|
||||||
reduced = bucket;
|
|
||||||
} else {
|
|
||||||
reduced.docCount += bucket.docCount;
|
|
||||||
}
|
|
||||||
aggregationsList.add(bucket.aggregations);
|
aggregationsList.add(bucket.aggregations);
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||||
return reduced;
|
return new Bucket(geohashAsLong, docCount, aggs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -181,19 +171,10 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||||||
@Override
|
@Override
|
||||||
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
|
public InternalGeoHashGrid reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregations.get(0);
|
|
||||||
grid.reduceAndTrimBuckets(reduceContext.bigArrays());
|
|
||||||
return grid;
|
|
||||||
}
|
|
||||||
InternalGeoHashGrid reduced = null;
|
|
||||||
|
|
||||||
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
InternalGeoHashGrid grid = (InternalGeoHashGrid) aggregation;
|
||||||
if (reduced == null) {
|
|
||||||
reduced = grid;
|
|
||||||
}
|
|
||||||
if (buckets == null) {
|
if (buckets == null) {
|
||||||
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
|
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
|
||||||
}
|
}
|
||||||
@ -207,12 +188,6 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reduced == null) {
|
|
||||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
|
||||||
return (InternalGeoHashGrid) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
|
|
||||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
final int size = (int) Math.min(requiredSize, buckets.size());
|
||||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
BucketPriorityQueue ordered = new BucketPriorityQueue(size);
|
||||||
for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
|
for (LongObjectPagedHashMap.Cursor<List<Bucket>> cursor : buckets) {
|
||||||
@ -224,28 +199,7 @@ public class InternalGeoHashGrid extends InternalAggregation implements GeoHashG
|
|||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||||
list[i] = ordered.pop();
|
list[i] = ordered.pop();
|
||||||
}
|
}
|
||||||
reduced.buckets = Arrays.asList(list);
|
return new InternalGeoHashGrid(getName(), requiredSize, Arrays.asList(list));
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void reduceAndTrimBuckets(BigArrays bigArrays) {
|
|
||||||
|
|
||||||
if (requiredSize > buckets.size()) { // nothing to trim
|
|
||||||
for (Bucket bucket : buckets) {
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Bucket> trimmedBuckets = new ArrayList<>(requiredSize);
|
|
||||||
for (Bucket bucket : buckets) {
|
|
||||||
if (trimmedBuckets.size() >= requiredSize) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
trimmedBuckets.add(bucket);
|
|
||||||
}
|
|
||||||
buckets = trimmedBuckets;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -56,4 +56,9 @@ public class InternalGlobal extends InternalSingleBucketAggregation implements G
|
|||||||
public Type type() {
|
public Type type() {
|
||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||||
|
return new InternalGlobal(name, docCount, subAggregations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,6 +57,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||||||
super(key, docCount, formatter, aggregations);
|
super(key, docCount, formatter, aggregations);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKey() {
|
public String getKey() {
|
||||||
return formatter != null ? formatter.format(key) : ValueFormatter.DateTime.DEFAULT.format(key);
|
return formatter != null ? formatter.format(key) : ValueFormatter.DateTime.DEFAULT.format(key);
|
||||||
@ -109,6 +114,11 @@ public class InternalDateHistogram extends InternalHistogram<InternalDateHistogr
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalHistogram.Factory<Bucket> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Bucket getBucketByKey(String key) {
|
public Bucket getBucketByKey(String key) {
|
||||||
try {
|
try {
|
||||||
|
@ -67,10 +67,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||||||
|
|
||||||
public static class Bucket implements Histogram.Bucket {
|
public static class Bucket implements Histogram.Bucket {
|
||||||
|
|
||||||
long key;
|
final long key;
|
||||||
long docCount;
|
final long docCount;
|
||||||
protected transient final @Nullable ValueFormatter formatter;
|
protected transient final @Nullable ValueFormatter formatter;
|
||||||
InternalAggregations aggregations;
|
final InternalAggregations aggregations;
|
||||||
|
|
||||||
public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
|
public Bucket(long key, long docCount, @Nullable ValueFormatter formatter, InternalAggregations aggregations) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
@ -79,6 +79,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||||||
this.aggregations = aggregations;
|
this.aggregations = aggregations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Factory<?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getKey() {
|
public String getKey() {
|
||||||
return formatter != null ? formatter.format(key) : ValueFormatter.RAW.format(key);
|
return formatter != null ? formatter.format(key) : ValueFormatter.RAW.format(key);
|
||||||
@ -105,24 +109,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||||||
}
|
}
|
||||||
|
|
||||||
<B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
|
<B extends Bucket> B reduce(List<B> buckets, BigArrays bigArrays) {
|
||||||
if (buckets.size() == 1) {
|
|
||||||
// we only need to reduce the sub aggregations
|
|
||||||
Bucket bucket = buckets.get(0);
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
return (B) bucket;
|
|
||||||
}
|
|
||||||
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
|
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
|
||||||
Bucket reduced = null;
|
long docCount = 0;
|
||||||
for (Bucket bucket : buckets) {
|
for (Bucket bucket : buckets) {
|
||||||
if (reduced == null) {
|
docCount += bucket.docCount;
|
||||||
reduced = bucket;
|
|
||||||
} else {
|
|
||||||
reduced.docCount += bucket.docCount;
|
|
||||||
}
|
|
||||||
aggregations.add((InternalAggregations) bucket.getAggregations());
|
aggregations.add((InternalAggregations) bucket.getAggregations());
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(aggregations, bigArrays);
|
InternalAggregations aggs = InternalAggregations.reduce(aggregations, bigArrays);
|
||||||
return (B) reduced;
|
return (B) getFactory().createBucket(key, docCount, aggs, formatter);
|
||||||
}
|
}
|
||||||
|
|
||||||
void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException {
|
void toXContent(XContentBuilder builder, Params params, boolean keyed, @Nullable ValueFormatter formatter) throws IOException {
|
||||||
@ -256,98 +250,13 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||||||
return bucketsMap.get(key.longValue());
|
return bucketsMap.get(key.longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Factory<B> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
|
|
||||||
InternalHistogram<B> histo = (InternalHistogram<B>) aggregations.get(0);
|
|
||||||
|
|
||||||
if (minDocCount == 1) {
|
|
||||||
for (B bucket : histo.buckets) {
|
|
||||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
|
||||||
}
|
|
||||||
return histo;
|
|
||||||
}
|
|
||||||
|
|
||||||
CollectionUtil.introSort(histo.buckets, order.asc ? InternalOrder.KEY_ASC.comparator() : InternalOrder.KEY_DESC.comparator());
|
|
||||||
List<B> list = order.asc ? histo.buckets : Lists.reverse(histo.buckets);
|
|
||||||
B lastBucket = null;
|
|
||||||
ListIterator<B> iter = list.listIterator();
|
|
||||||
|
|
||||||
// we need to fill the gaps with empty buckets
|
|
||||||
if (minDocCount == 0) {
|
|
||||||
ExtendedBounds bounds = emptyBucketInfo.bounds;
|
|
||||||
|
|
||||||
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
|
|
||||||
if (bounds != null) {
|
|
||||||
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
|
|
||||||
if (firstBucket == null) {
|
|
||||||
if (bounds.min != null && bounds.max != null) {
|
|
||||||
long key = bounds.min;
|
|
||||||
long max = bounds.max;
|
|
||||||
while (key <= max) {
|
|
||||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
|
||||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (bounds.min != null) {
|
|
||||||
long key = bounds.min;
|
|
||||||
while (key < firstBucket.key) {
|
|
||||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
|
||||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now adding the empty buckets within the actual data,
|
|
||||||
// e.g. if the data series is [1,2,3,7] there are 3 empty buckets that will be created for 4,5,6
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
// look ahead on the next bucket without advancing the iter
|
|
||||||
// so we'll be able to insert elements at the right position
|
|
||||||
B nextBucket = list.get(iter.nextIndex());
|
|
||||||
nextBucket.aggregations.reduce(reduceContext.bigArrays());
|
|
||||||
if (lastBucket != null) {
|
|
||||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
|
||||||
while (key != nextBucket.key) {
|
|
||||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
|
||||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastBucket = iter.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
// finally, adding the empty buckets *after* the actual data (based on the extended_bounds.max requested by the user)
|
|
||||||
if (bounds != null && lastBucket != null && bounds.max != null && bounds.max > lastBucket.key) {
|
|
||||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
|
||||||
long max = bounds.max;
|
|
||||||
while (key <= max) {
|
|
||||||
iter.add(createBucket(key, 0, emptyBucketInfo.subAggregations, formatter));
|
|
||||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
while (iter.hasNext()) {
|
|
||||||
InternalHistogram.Bucket bucket = iter.next();
|
|
||||||
if (bucket.getDocCount() < minDocCount) {
|
|
||||||
iter.remove();
|
|
||||||
} else {
|
|
||||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (order != InternalOrder.KEY_ASC && order != InternalOrder.KEY_DESC) {
|
|
||||||
CollectionUtil.introSort(histo.buckets, order.comparator());
|
|
||||||
}
|
|
||||||
|
|
||||||
return histo;
|
|
||||||
}
|
|
||||||
|
|
||||||
InternalHistogram reduced = (InternalHistogram) aggregations.get(0);
|
|
||||||
|
|
||||||
LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays());
|
LongObjectPagedHashMap<List<B>> bucketsByKey = new LongObjectPagedHashMap<>(reduceContext.bigArrays());
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
@ -437,9 +346,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||||||
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return getFactory().create(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, formatter, keyed);
|
||||||
reduced.buckets = reducedBuckets;
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
protected B createBucket(long key, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
||||||
|
@ -58,4 +58,8 @@ public class InternalMissing extends InternalSingleBucketAggregation implements
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||||
|
return new InternalMissing(name, docCount, subAggregations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,4 +57,8 @@ public class InternalNested extends InternalSingleBucketAggregation implements N
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||||
|
return new InternalNested(name, docCount, subAggregations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,4 +57,8 @@ public class InternalReverseNested extends InternalSingleBucketAggregation imple
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
|
||||||
|
return new InternalReverseNested(name, docCount, subAggregations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,11 +60,12 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
|
|
||||||
public static class Bucket implements Range.Bucket {
|
public static class Bucket implements Range.Bucket {
|
||||||
|
|
||||||
private double from = Double.NEGATIVE_INFINITY;
|
private final ValueFormatter formatter;
|
||||||
private double to = Double.POSITIVE_INFINITY;
|
private final double from;
|
||||||
private long docCount;
|
private final double to;
|
||||||
InternalAggregations aggregations;
|
private final long docCount;
|
||||||
private String key;
|
final InternalAggregations aggregations;
|
||||||
|
private final String key;
|
||||||
|
|
||||||
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
public Bucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter) {
|
||||||
this.key = key != null ? key : generateKey(from, to, formatter);
|
this.key = key != null ? key : generateKey(from, to, formatter);
|
||||||
@ -72,6 +73,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
this.to = to;
|
this.to = to;
|
||||||
this.docCount = docCount;
|
this.docCount = docCount;
|
||||||
this.aggregations = aggregations;
|
this.aggregations = aggregations;
|
||||||
|
this.formatter = formatter;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getKey() {
|
public String getKey() {
|
||||||
@ -103,25 +105,19 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
return aggregations;
|
return aggregations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Factory<? extends Bucket, ?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
|
Bucket reduce(List<Bucket> ranges, BigArrays bigArrays) {
|
||||||
if (ranges.size() == 1) {
|
long docCount = 0;
|
||||||
// we stil need to call reduce on all the sub aggregations
|
|
||||||
Bucket bucket = ranges.get(0);
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
return bucket;
|
|
||||||
}
|
|
||||||
Bucket reduced = null;
|
|
||||||
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
|
List<InternalAggregations> aggregationsList = Lists.newArrayListWithCapacity(ranges.size());
|
||||||
for (Bucket range : ranges) {
|
for (Bucket range : ranges) {
|
||||||
if (reduced == null) {
|
docCount += range.docCount;
|
||||||
reduced = range;
|
|
||||||
} else {
|
|
||||||
reduced.docCount += range.docCount;
|
|
||||||
}
|
|
||||||
aggregationsList.add(range.aggregations);
|
aggregationsList.add(range.aggregations);
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||||
return reduced;
|
return getFactory().createBucket(key, from, to, docCount, aggs, formatter);
|
||||||
}
|
}
|
||||||
|
|
||||||
void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException {
|
void toXContent(XContentBuilder builder, Params params, @Nullable ValueFormatter formatter, boolean keyed) throws IOException {
|
||||||
@ -164,8 +160,8 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
return TYPE.name();
|
return TYPE.name();
|
||||||
}
|
}
|
||||||
|
|
||||||
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public R create(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
return (R) new InternalRange<>(name, ranges, formatter, keyed, unmapped);
|
return (R) new InternalRange<>(name, ranges, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -178,16 +174,14 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
private Map<String, B> rangeMap;
|
private Map<String, B> rangeMap;
|
||||||
private @Nullable ValueFormatter formatter;
|
private @Nullable ValueFormatter formatter;
|
||||||
private boolean keyed;
|
private boolean keyed;
|
||||||
private boolean unmapped;
|
|
||||||
|
|
||||||
public InternalRange() {} // for serialization
|
public InternalRange() {} // for serialization
|
||||||
|
|
||||||
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public InternalRange(String name, List<B> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
super(name);
|
super(name);
|
||||||
this.ranges = ranges;
|
this.ranges = ranges;
|
||||||
this.formatter = formatter;
|
this.formatter = formatter;
|
||||||
this.keyed = keyed;
|
this.keyed = keyed;
|
||||||
this.unmapped = unmapped;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -211,52 +205,31 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
return rangeMap.get(key);
|
return rangeMap.get(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Factory<B, ?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
@SuppressWarnings("unchecked")
|
||||||
InternalRange<B> reduced = (InternalRange<B>) aggregations.get(0);
|
List<Bucket>[] rangeList = new List[ranges.size()];
|
||||||
for (B bucket : reduced.ranges) {
|
for (int i = 0; i < rangeList.length; ++i) {
|
||||||
bucket.aggregations.reduce(reduceContext.bigArrays());
|
rangeList[i] = new ArrayList<Bucket>();
|
||||||
}
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
List<List<Bucket>> rangesList = null;
|
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalRange<Bucket> ranges = (InternalRange) aggregation;
|
InternalRange<?> ranges = (InternalRange<?>) aggregation;
|
||||||
if (ranges.unmapped) {
|
int i = 0;
|
||||||
continue;
|
for (Bucket range : ranges.ranges) {
|
||||||
}
|
rangeList[i++].add(range);
|
||||||
if (rangesList == null) {
|
|
||||||
rangesList = new ArrayList<>(ranges.ranges.size());
|
|
||||||
for (Bucket bucket : ranges.ranges) {
|
|
||||||
List<Bucket> sameRangeList = new ArrayList<>(aggregations.size());
|
|
||||||
sameRangeList.add(bucket);
|
|
||||||
rangesList.add(sameRangeList);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
int i = 0;
|
|
||||||
for (Bucket range : ranges.ranges) {
|
|
||||||
rangesList.get(i++).add(range);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rangesList == null) {
|
final List<B> ranges = new ArrayList<>();
|
||||||
// unmapped, we can just take the first one
|
for (int i = 0; i < this.ranges.size(); ++i) {
|
||||||
return aggregations.get(0);
|
ranges.add((B) rangeList[i].get(0).reduce(rangeList[i], reduceContext.bigArrays()));
|
||||||
}
|
}
|
||||||
|
return getFactory().create(name, ranges, formatter, keyed);
|
||||||
InternalRange reduced = (InternalRange) aggregations.get(0);
|
|
||||||
int i = 0;
|
|
||||||
for (List<Bucket> sameRangeList : rangesList) {
|
|
||||||
reduced.ranges.set(i++, (sameRangeList.get(0)).reduce(sameRangeList, reduceContext.bigArrays()));
|
|
||||||
}
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
|
||||||
return (B) new Bucket(key, from, to, docCount, aggregations, formatter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -268,7 +241,7 @@ public class InternalRange<B extends InternalRange.Bucket> extends InternalAggre
|
|||||||
List<B> ranges = Lists.newArrayListWithCapacity(size);
|
List<B> ranges = Lists.newArrayListWithCapacity(size);
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
String key = in.readOptionalString();
|
String key = in.readOptionalString();
|
||||||
ranges.add(createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
|
ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), InternalAggregations.readAggregations(in), formatter));
|
||||||
}
|
}
|
||||||
this.ranges = ranges;
|
this.ranges = ranges;
|
||||||
this.rangeMap = null;
|
this.rangeMap = null;
|
||||||
|
@ -203,7 +203,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||||||
buckets.add(bucket);
|
buckets.add(bucket);
|
||||||
}
|
}
|
||||||
// value source can be null in the case of unmapped fields
|
// value source can be null in the case of unmapped fields
|
||||||
return rangeFactory.create(name, buckets, formatter, keyed, false);
|
return rangeFactory.create(name, buckets, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -217,7 +217,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||||||
buckets.add(bucket);
|
buckets.add(bucket);
|
||||||
}
|
}
|
||||||
// value source can be null in the case of unmapped fields
|
// value source can be null in the case of unmapped fields
|
||||||
return rangeFactory.create(name, buckets, formatter, keyed, false);
|
return rangeFactory.create(name, buckets, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final void sortRanges(final Range[] ranges) {
|
private static final void sortRanges(final Range[] ranges) {
|
||||||
@ -274,7 +274,7 @@ public class RangeAggregator extends BucketsAggregator {
|
|||||||
for (RangeAggregator.Range range : ranges) {
|
for (RangeAggregator.Range range : ranges) {
|
||||||
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter));
|
buckets.add(factory.createBucket(range.key, range.from, range.to, 0, subAggs, formatter));
|
||||||
}
|
}
|
||||||
return factory.create(name, buckets, formatter, keyed, true);
|
return factory.create(name, buckets, formatter, keyed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,6 +72,11 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||||||
public DateTime getToAsDate() {
|
public DateTime getToAsDate() {
|
||||||
return Double.isInfinite(getTo().doubleValue()) ? null : new DateTime(getTo().longValue(), DateTimeZone.UTC);
|
return Double.isInfinite(getTo().doubleValue()) ? null : new DateTime(getTo().longValue(), DateTimeZone.UTC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
|
private static class Factory extends InternalRange.Factory<InternalDateRange.Bucket, InternalDateRange> {
|
||||||
@ -82,8 +87,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public InternalDateRange create(String name, List<InternalDateRange.Bucket> ranges, ValueFormatter formatter, boolean keyed) {
|
||||||
return new InternalDateRange(name, ranges, formatter, keyed, unmapped);
|
return new InternalDateRange(name, ranges, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -94,8 +99,8 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||||||
|
|
||||||
InternalDateRange() {} // for serialization
|
InternalDateRange() {} // for serialization
|
||||||
|
|
||||||
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
InternalDateRange(String name, List<InternalDateRange.Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
super(name, ranges, formatter, keyed, unmapped);
|
super(name, ranges, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -104,7 +109,7 @@ public class InternalDateRange extends InternalRange<InternalDateRange.Bucket> i
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
return new Bucket(key, from, to, docCount, aggregations, formatter);
|
return FACTORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,10 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||||||
super(key, from, to, docCount, aggregations, formatter);
|
super(key, from, to, docCount, aggregations, formatter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
|
private static class Factory extends InternalRange.Factory<InternalGeoDistance.Bucket, InternalGeoDistance> {
|
||||||
@ -71,8 +75,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public InternalGeoDistance create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
return new InternalGeoDistance(name, ranges, formatter, keyed, unmapped);
|
return new InternalGeoDistance(name, ranges, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -83,8 +87,8 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||||||
|
|
||||||
InternalGeoDistance() {} // for serialization
|
InternalGeoDistance() {} // for serialization
|
||||||
|
|
||||||
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public InternalGeoDistance(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
super(name, ranges, formatter, keyed, unmapped);
|
super(name, ranges, formatter, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -93,7 +97,7 @@ public class InternalGeoDistance extends InternalRange<InternalGeoDistance.Bucke
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, ValueFormatter formatter) {
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
return new Bucket(key, from, to, docCount, aggregations, formatter);
|
return FACTORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -74,6 +74,11 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||||||
double to = getTo().doubleValue();
|
double to = getTo().doubleValue();
|
||||||
return Double.isInfinite(to) ? null : MAX_IP == to ? null : ValueFormatter.IPv4.format(to);
|
return Double.isInfinite(to) ? null : MAX_IP == to ? null : ValueFormatter.IPv4.format(to);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
|
return FACTORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> {
|
private static class Factory extends InternalRange.Factory<InternalIPv4Range.Bucket, InternalIPv4Range> {
|
||||||
@ -84,8 +89,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed, boolean unmapped) {
|
public InternalIPv4Range create(String name, List<Bucket> ranges, @Nullable ValueFormatter formatter, boolean keyed) {
|
||||||
return new InternalIPv4Range(name, ranges, keyed, unmapped);
|
return new InternalIPv4Range(name, ranges, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -96,8 +101,8 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||||||
|
|
||||||
public InternalIPv4Range() {} // for serialization
|
public InternalIPv4Range() {} // for serialization
|
||||||
|
|
||||||
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed, boolean unmapped) {
|
public InternalIPv4Range(String name, List<InternalIPv4Range.Bucket> ranges, boolean keyed) {
|
||||||
super(name, ranges, ValueFormatter.IPv4, keyed, unmapped);
|
super(name, ranges, ValueFormatter.IPv4, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -106,8 +111,7 @@ public class InternalIPv4Range extends InternalRange<InternalIPv4Range.Bucket> i
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Bucket createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, @Nullable ValueFormatter formatter ) {
|
protected InternalRange.Factory<Bucket, ?> getFactory() {
|
||||||
return new Bucket(key, from, to, docCount, aggregations);
|
return FACTORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.search.aggregations.bucket.significant;
|
package org.elasticsearch.search.aggregations.bucket.significant;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
@ -143,25 +142,20 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||||
if (buckets.size() == 1) {
|
long subsetDf = 0;
|
||||||
return buckets.get(0);
|
long supersetDf = 0;
|
||||||
}
|
|
||||||
Bucket reduced = null;
|
|
||||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||||
for (Bucket bucket : buckets) {
|
for (Bucket bucket : buckets) {
|
||||||
if (reduced == null) {
|
subsetDf += bucket.subsetDf;
|
||||||
reduced = bucket;
|
supersetDf += bucket.supersetDf;
|
||||||
} else {
|
|
||||||
reduced.subsetDf += bucket.subsetDf;
|
|
||||||
reduced.supersetDf += bucket.supersetDf;
|
|
||||||
reduced.updateScore();
|
|
||||||
}
|
|
||||||
aggregationsList.add(bucket.aggregations);
|
aggregationsList.add(bucket.aggregations);
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||||
return reduced;
|
return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double getSignificanceScore() {
|
public double getSignificanceScore() {
|
||||||
return score;
|
return score;
|
||||||
@ -201,14 +195,8 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalSignificantTerms reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
InternalSignificantTerms terms = (InternalSignificantTerms) aggregations.get(0);
|
|
||||||
terms.trimExcessEntries();
|
|
||||||
return terms;
|
|
||||||
}
|
|
||||||
InternalSignificantTerms reduced = null;
|
|
||||||
|
|
||||||
long globalSubsetSize = 0;
|
long globalSubsetSize = 0;
|
||||||
long globalSupersetSize = 0;
|
long globalSupersetSize = 0;
|
||||||
@ -219,18 +207,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||||||
globalSubsetSize += terms.subsetSize;
|
globalSubsetSize += terms.subsetSize;
|
||||||
globalSupersetSize += terms.supersetSize;
|
globalSupersetSize += terms.supersetSize;
|
||||||
}
|
}
|
||||||
Map<String, List<InternalSignificantTerms.Bucket>> buckets = null;
|
Map<String, List<InternalSignificantTerms.Bucket>> buckets = new HashMap<>();
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalSignificantTerms terms = (InternalSignificantTerms) aggregation;
|
InternalSignificantTerms terms = (InternalSignificantTerms) aggregation;
|
||||||
if (terms instanceof UnmappedSignificantTerms) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = terms;
|
|
||||||
}
|
|
||||||
if (buckets == null) {
|
|
||||||
buckets = new HashMap<>(terms.buckets.size());
|
|
||||||
}
|
|
||||||
for (Bucket bucket : terms.buckets) {
|
for (Bucket bucket : terms.buckets) {
|
||||||
List<Bucket> existingBuckets = buckets.get(bucket.getKey());
|
List<Bucket> existingBuckets = buckets.get(bucket.getKey());
|
||||||
if (existingBuckets == null) {
|
if (existingBuckets == null) {
|
||||||
@ -239,19 +218,10 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||||||
}
|
}
|
||||||
// Adjust the buckets with the global stats representing the
|
// Adjust the buckets with the global stats representing the
|
||||||
// total size of the pots from which the stats are drawn
|
// total size of the pots from which the stats are drawn
|
||||||
bucket.subsetSize = globalSubsetSize;
|
existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, bucket.aggregations));
|
||||||
bucket.supersetSize = globalSupersetSize;
|
|
||||||
bucket.updateScore();
|
|
||||||
existingBuckets.add(bucket);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reduced == null) {
|
|
||||||
// there are only unmapped terms, so we just return the first one
|
|
||||||
// (no need to reduce)
|
|
||||||
return (UnmappedSignificantTerms) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
final int size = Math.min(requiredSize, buckets.size());
|
final int size = Math.min(requiredSize, buckets.size());
|
||||||
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
|
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
|
||||||
for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) {
|
for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) {
|
||||||
@ -265,23 +235,9 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||||
list[i] = (Bucket) ordered.pop();
|
list[i] = (Bucket) ordered.pop();
|
||||||
}
|
}
|
||||||
reduced.buckets = Arrays.asList(list);
|
return newAggregation(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
|
||||||
reduced.subsetSize = globalSubsetSize;
|
|
||||||
reduced.supersetSize = globalSupersetSize;
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final void trimExcessEntries() {
|
abstract InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets);
|
||||||
final List<Bucket> newBuckets = Lists.newArrayList();
|
|
||||||
for (Bucket b : buckets) {
|
|
||||||
if (newBuckets.size() >= requiredSize) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (b.subsetDf >= minDocCount) {
|
|
||||||
newBuckets.add(b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buckets = newBuckets;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -83,6 +83,11 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||||||
return Long.toString(term);
|
return Long.toString(term);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
|
||||||
|
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ValueFormatter formatter;
|
private ValueFormatter formatter;
|
||||||
@ -101,6 +106,12 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
|
||||||
|
List<InternalSignificantTerms.Bucket> buckets) {
|
||||||
|
return new SignificantLongTerms(subsetSize, supersetSize, getName(), formatter, requiredSize, supersetSize, buckets);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
this.name = in.readString();
|
this.name = in.readString();
|
||||||
|
@ -85,6 +85,10 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||||||
return termBytes.utf8ToString();
|
return termBytes.utf8ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
|
||||||
|
return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SignificantStringTerms() {} // for serialization
|
SignificantStringTerms() {} // for serialization
|
||||||
@ -99,6 +103,12 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize,
|
||||||
|
List<InternalSignificantTerms.Bucket> buckets) {
|
||||||
|
return new SignificantStringTerms(subsetSize, supersetSize, getName(), requiredSize, supersetSize, buckets);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
this.name = in.readString();
|
this.name = in.readString();
|
||||||
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -64,6 +66,21 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
|
if (!(aggregation instanceof UnmappedSignificantTerms)) {
|
||||||
|
return aggregation.reduce(reduceContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
InternalSignificantTerms newAggregation(long subsetSize, long supersetSize, List<Bucket> buckets) {
|
||||||
|
throw new UnsupportedOperationException("How did you get there?");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
this.name = in.readString();
|
this.name = in.readString();
|
||||||
|
@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.text.StringText;
|
import org.elasticsearch.common.text.StringText;
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
import org.elasticsearch.common.util.DoubleObjectPagedHashMap;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
|
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -87,6 +83,15 @@ public class DoubleTerms extends InternalTerms {
|
|||||||
return Double.compare(term, other.getKeyAsNumber().doubleValue());
|
return Double.compare(term, other.getKeyAsNumber().doubleValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Object getKeyAsObject() {
|
||||||
|
return getKeyAsNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||||
|
return new Bucket(term, docCount, aggs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private @Nullable ValueFormatter formatter;
|
private @Nullable ValueFormatter formatter;
|
||||||
@ -104,59 +109,8 @@ public class DoubleTerms extends InternalTerms {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
return new DoubleTerms(name, order, formatter, requiredSize, minDocCount, buckets);
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
|
||||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
|
||||||
return terms;
|
|
||||||
}
|
|
||||||
InternalTerms reduced = null;
|
|
||||||
|
|
||||||
DoubleObjectPagedHashMap<List<Bucket>> buckets = null;
|
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
InternalTerms terms = (InternalTerms) aggregation;
|
|
||||||
if (terms instanceof UnmappedTerms) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = terms;
|
|
||||||
}
|
|
||||||
if (buckets == null) {
|
|
||||||
buckets = new DoubleObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
|
|
||||||
}
|
|
||||||
for (Terms.Bucket bucket : terms.buckets) {
|
|
||||||
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
|
|
||||||
if (existingBuckets == null) {
|
|
||||||
existingBuckets = new ArrayList<>(aggregations.size());
|
|
||||||
buckets.put(((Bucket) bucket).term, existingBuckets);
|
|
||||||
}
|
|
||||||
existingBuckets.add((Bucket) bucket);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reduced == null) {
|
|
||||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
|
||||||
return (UnmappedTerms) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: would it be better to sort the backing array buffer of hppc map directly instead of using a PQ?
|
|
||||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
|
||||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
|
||||||
for (DoubleObjectPagedHashMap.Cursor<List<DoubleTerms.Bucket>> cursor : buckets) {
|
|
||||||
List<DoubleTerms.Bucket> sameTermBuckets = cursor.value;
|
|
||||||
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
|
||||||
if (b.getDocCount() >= minDocCount) {
|
|
||||||
ordered.insertWithOverflow(b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buckets.close();
|
|
||||||
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
|
|
||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
|
||||||
list[i] = (Bucket) ordered.pop();
|
|
||||||
}
|
|
||||||
reduced.buckets = Arrays.asList(list);
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,10 +18,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.search.aggregations.bucket.terms;
|
package org.elasticsearch.search.aggregations.bucket.terms;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Multimap;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.text.Text;
|
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.search.aggregations.Aggregations;
|
import org.elasticsearch.search.aggregations.Aggregations;
|
||||||
@ -58,24 +58,19 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||||||
return aggregations;
|
return aggregations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract Object getKeyAsObject();
|
||||||
|
|
||||||
|
abstract Bucket newBucket(long docCount, InternalAggregations aggs);
|
||||||
|
|
||||||
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
public Bucket reduce(List<? extends Bucket> buckets, BigArrays bigArrays) {
|
||||||
if (buckets.size() == 1) {
|
long docCount = 0;
|
||||||
Bucket bucket = buckets.get(0);
|
|
||||||
bucket.aggregations.reduce(bigArrays);
|
|
||||||
return bucket;
|
|
||||||
}
|
|
||||||
Bucket reduced = null;
|
|
||||||
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
|
||||||
for (Bucket bucket : buckets) {
|
for (Bucket bucket : buckets) {
|
||||||
if (reduced == null) {
|
docCount += bucket.docCount;
|
||||||
reduced = bucket;
|
|
||||||
} else {
|
|
||||||
reduced.docCount += bucket.docCount;
|
|
||||||
}
|
|
||||||
aggregationsList.add(bucket.aggregations);
|
aggregationsList.add(bucket.aggregations);
|
||||||
}
|
}
|
||||||
reduced.aggregations = InternalAggregations.reduce(aggregationsList, bigArrays);
|
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, bigArrays);
|
||||||
return reduced;
|
return newBucket(docCount, aggs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,47 +108,21 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
|
||||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
|
||||||
return terms;
|
|
||||||
}
|
|
||||||
|
|
||||||
InternalTerms reduced = null;
|
Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
|
||||||
|
|
||||||
Map<Text, List<InternalTerms.Bucket>> buckets = null;
|
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
for (InternalAggregation aggregation : aggregations) {
|
||||||
InternalTerms terms = (InternalTerms) aggregation;
|
InternalTerms terms = (InternalTerms) aggregation;
|
||||||
if (terms instanceof UnmappedTerms) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = terms;
|
|
||||||
}
|
|
||||||
if (buckets == null) {
|
|
||||||
buckets = new HashMap<>(terms.buckets.size());
|
|
||||||
}
|
|
||||||
for (Bucket bucket : terms.buckets) {
|
for (Bucket bucket : terms.buckets) {
|
||||||
List<Bucket> existingBuckets = buckets.get(bucket.getKeyAsText());
|
buckets.put(bucket.getKeyAsObject(), bucket);
|
||||||
if (existingBuckets == null) {
|
|
||||||
existingBuckets = new ArrayList<>(aggregations.size());
|
|
||||||
buckets.put(bucket.getKeyAsText(), existingBuckets);
|
|
||||||
}
|
|
||||||
existingBuckets.add(bucket);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reduced == null) {
|
|
||||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
|
||||||
return (UnmappedTerms) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
final int size = Math.min(requiredSize, buckets.size());
|
final int size = Math.min(requiredSize, buckets.size());
|
||||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
||||||
for (Map.Entry<Text, List<Bucket>> entry : buckets.entrySet()) {
|
for (Collection<Bucket> l : buckets.asMap().values()) {
|
||||||
List<Bucket> sameTermBuckets = entry.getValue();
|
List<Bucket> sameTermBuckets = (List<Bucket>) l; // cast is ok according to javadocs
|
||||||
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
||||||
if (b.docCount >= minDocCount) {
|
if (b.docCount >= minDocCount) {
|
||||||
ordered.insertWithOverflow(b);
|
ordered.insertWithOverflow(b);
|
||||||
@ -163,22 +132,9 @@ public abstract class InternalTerms extends InternalAggregation implements Terms
|
|||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||||
list[i] = (Bucket) ordered.pop();
|
list[i] = (Bucket) ordered.pop();
|
||||||
}
|
}
|
||||||
reduced.buckets = Arrays.asList(list);
|
return newAggregation(name, Arrays.asList(list));
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final void trimExcessEntries(BigArrays bigArrays) {
|
protected abstract InternalTerms newAggregation(String name, List<Bucket> buckets);
|
||||||
final List<Bucket> newBuckets = Lists.newArrayList();
|
|
||||||
for (Bucket b : buckets) {
|
|
||||||
if (newBuckets.size() >= requiredSize) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (b.docCount >= minDocCount) {
|
|
||||||
newBuckets.add(b);
|
|
||||||
b.aggregations.reduce(bigArrays);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buckets = newBuckets;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,18 +23,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.text.StringText;
|
import org.elasticsearch.common.text.StringText;
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
|
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -87,6 +83,16 @@ public class LongTerms extends InternalTerms {
|
|||||||
int compareTerm(Terms.Bucket other) {
|
int compareTerm(Terms.Bucket other) {
|
||||||
return Long.compare(term, other.getKeyAsNumber().longValue());
|
return Long.compare(term, other.getKeyAsNumber().longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Object getKeyAsObject() {
|
||||||
|
return getKeyAsNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||||
|
return new Bucket(term, docCount, aggs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private @Nullable ValueFormatter formatter;
|
private @Nullable ValueFormatter formatter;
|
||||||
@ -104,59 +110,8 @@ public class LongTerms extends InternalTerms {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalTerms reduce(ReduceContext reduceContext) {
|
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
return new LongTerms(name, order, formatter, requiredSize, minDocCount, buckets);
|
||||||
if (aggregations.size() == 1) {
|
|
||||||
InternalTerms terms = (InternalTerms) aggregations.get(0);
|
|
||||||
terms.trimExcessEntries(reduceContext.bigArrays());
|
|
||||||
return terms;
|
|
||||||
}
|
|
||||||
InternalTerms reduced = null;
|
|
||||||
|
|
||||||
LongObjectPagedHashMap<List<Bucket>> buckets = null;
|
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
InternalTerms terms = (InternalTerms) aggregation;
|
|
||||||
if (terms instanceof UnmappedTerms) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = terms;
|
|
||||||
}
|
|
||||||
if (buckets == null) {
|
|
||||||
buckets = new LongObjectPagedHashMap<>(terms.buckets.size(), reduceContext.bigArrays());
|
|
||||||
}
|
|
||||||
for (Terms.Bucket bucket : terms.buckets) {
|
|
||||||
List<Bucket> existingBuckets = buckets.get(((Bucket) bucket).term);
|
|
||||||
if (existingBuckets == null) {
|
|
||||||
existingBuckets = new ArrayList<>(aggregations.size());
|
|
||||||
buckets.put(((Bucket) bucket).term, existingBuckets);
|
|
||||||
}
|
|
||||||
existingBuckets.add((Bucket) bucket);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reduced == null) {
|
|
||||||
// there are only unmapped terms, so we just return the first one (no need to reduce)
|
|
||||||
return (UnmappedTerms) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: would it be better to sort the backing array buffer of the hppc map directly instead of using a PQ?
|
|
||||||
final int size = (int) Math.min(requiredSize, buckets.size());
|
|
||||||
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
|
|
||||||
for (LongObjectPagedHashMap.Cursor<List<LongTerms.Bucket>> cursor : buckets) {
|
|
||||||
List<LongTerms.Bucket> sameTermBuckets = cursor.value;
|
|
||||||
final InternalTerms.Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext.bigArrays());
|
|
||||||
if (b.getDocCount() >= minDocCount) {
|
|
||||||
ordered.insertWithOverflow(b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
buckets.close();
|
|
||||||
InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
|
|
||||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
|
||||||
list[i] = (Bucket) ordered.pop();
|
|
||||||
}
|
|
||||||
reduced.buckets = Arrays.asList(list);
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -84,6 +84,16 @@ public class StringTerms extends InternalTerms {
|
|||||||
int compareTerm(Terms.Bucket other) {
|
int compareTerm(Terms.Bucket other) {
|
||||||
return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes);
|
return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Object getKeyAsObject() {
|
||||||
|
return getKeyAsText();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Bucket newBucket(long docCount, InternalAggregations aggs) {
|
||||||
|
return new Bucket(termBytes, docCount, aggs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
StringTerms() {} // for serialization
|
StringTerms() {} // for serialization
|
||||||
@ -97,6 +107,11 @@ public class StringTerms extends InternalTerms {
|
|||||||
return TYPE;
|
return TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalTerms newAggregation(String name, List<InternalTerms.Bucket> buckets) {
|
||||||
|
return new StringTerms(name, order, requiredSize, minDocCount, buckets);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
this.name = in.readString();
|
this.name = in.readString();
|
||||||
|
@ -22,10 +22,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.search.aggregations.AggregationStreams;
|
import org.elasticsearch.search.aggregations.AggregationStreams;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -80,6 +82,21 @@ public class UnmappedTerms extends InternalTerms {
|
|||||||
out.writeVLong(minDocCount);
|
out.writeVLong(minDocCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
|
for (InternalAggregation agg : reduceContext.aggregations()) {
|
||||||
|
if (!(agg instanceof UnmappedTerms)) {
|
||||||
|
return agg.reduce(reduceContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalTerms newAggregation(String name, List<Bucket> buckets) {
|
||||||
|
throw new UnsupportedOperationException("How did you get there?");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject(name);
|
builder.startObject(name);
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -76,20 +75,13 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAvg reduce(ReduceContext reduceContext) {
|
public InternalAvg reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
long count = 0;
|
||||||
if (aggregations.size() == 1) {
|
double sum = 0;
|
||||||
return (InternalAvg) aggregations.get(0);
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
|
count += ((InternalAvg) aggregation).count;
|
||||||
|
sum += ((InternalAvg) aggregation).sum;
|
||||||
}
|
}
|
||||||
InternalAvg reduced = null;
|
return new InternalAvg(getName(), sum, count);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = (InternalAvg) aggregation;
|
|
||||||
} else {
|
|
||||||
reduced.count += ((InternalAvg) aggregation).count;
|
|
||||||
reduced.sum += ((InternalAvg) aggregation).sum;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,6 +184,10 @@ public final class HyperLogLogPlusPlus implements Releasable {
|
|||||||
alphaMM = alpha * m * m;
|
alphaMM = alpha * m * m;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int precision() {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
public long maxBucket() {
|
public long maxBucket() {
|
||||||
return runLens.size() >>> p;
|
return runLens.size() >>> p;
|
||||||
}
|
}
|
||||||
|
@ -104,18 +104,17 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
|
|||||||
final InternalCardinality cardinality = (InternalCardinality) aggregation;
|
final InternalCardinality cardinality = (InternalCardinality) aggregation;
|
||||||
if (cardinality.counts != null) {
|
if (cardinality.counts != null) {
|
||||||
if (reduced == null) {
|
if (reduced == null) {
|
||||||
reduced = cardinality;
|
reduced = new InternalCardinality(name, new HyperLogLogPlusPlus(cardinality.counts.precision(), BigArrays.NON_RECYCLING_INSTANCE, 1));
|
||||||
} else {
|
|
||||||
reduced.merge(cardinality);
|
|
||||||
}
|
}
|
||||||
|
reduced.merge(cardinality);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reduced == null) { // all empty
|
if (reduced == null) { // all empty
|
||||||
reduced = (InternalCardinality) aggregations.get(0);
|
return aggregations.get(0);
|
||||||
|
} else {
|
||||||
|
return reduced;
|
||||||
}
|
}
|
||||||
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void merge(InternalCardinality other) {
|
public void merge(InternalCardinality other) {
|
||||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||||||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds {
|
public class InternalGeoBounds extends InternalMetricsAggregation implements GeoBounds {
|
||||||
|
|
||||||
@ -72,36 +71,36 @@ public class InternalGeoBounds extends InternalMetricsAggregation implements Geo
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
InternalGeoBounds reduced = null;
|
double top = Double.NEGATIVE_INFINITY;
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
double bottom = Double.POSITIVE_INFINITY;
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
double posLeft = Double.POSITIVE_INFINITY;
|
||||||
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
|
double posRight = Double.NEGATIVE_INFINITY;
|
||||||
|
double negLeft = Double.POSITIVE_INFINITY;
|
||||||
if (reduced == null) {
|
double negRight = Double.NEGATIVE_INFINITY;
|
||||||
reduced = bounds;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bounds.top > reduced.top) {
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
reduced.top = bounds.top;
|
InternalGeoBounds bounds = (InternalGeoBounds) aggregation;
|
||||||
|
|
||||||
|
if (bounds.top > top) {
|
||||||
|
top = bounds.top;
|
||||||
}
|
}
|
||||||
if (bounds.bottom < reduced.bottom) {
|
if (bounds.bottom < bottom) {
|
||||||
reduced.bottom = bounds.bottom;
|
bottom = bounds.bottom;
|
||||||
}
|
}
|
||||||
if (bounds.posLeft < reduced.posLeft) {
|
if (bounds.posLeft < posLeft) {
|
||||||
reduced.posLeft = bounds.posLeft;
|
posLeft = bounds.posLeft;
|
||||||
}
|
}
|
||||||
if (bounds.posRight > reduced.posRight) {
|
if (bounds.posRight > posRight) {
|
||||||
reduced.posRight = bounds.posRight;
|
posRight = bounds.posRight;
|
||||||
}
|
}
|
||||||
if (bounds.negLeft < reduced.negLeft) {
|
if (bounds.negLeft < negLeft) {
|
||||||
reduced.negLeft = bounds.negLeft;
|
negLeft = bounds.negLeft;
|
||||||
}
|
}
|
||||||
if (bounds.negRight > reduced.negRight) {
|
if (bounds.negRight > negRight) {
|
||||||
reduced.negRight = bounds.negRight;
|
negRight = bounds.negRight;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return reduced;
|
return new InternalGeoBounds(name, top, bottom, posLeft, posRight, negLeft, negRight, wrapLongitude);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -74,22 +73,11 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalMax reduce(ReduceContext reduceContext) {
|
public InternalMax reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
if (aggregations.size() == 1) {
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
return (InternalMax) aggregations.get(0);
|
max = Math.max(max, ((InternalMax) aggregation).max);
|
||||||
}
|
}
|
||||||
InternalMax reduced = null;
|
return new InternalMax(name, max);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = (InternalMax) aggregation;
|
|
||||||
} else {
|
|
||||||
reduced.max = Math.max(reduced.max, ((InternalMax) aggregation).max);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (reduced != null) {
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
return (InternalMax) aggregations.get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -75,22 +74,11 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalMin reduce(ReduceContext reduceContext) {
|
public InternalMin reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
double min = Double.POSITIVE_INFINITY;
|
||||||
if (aggregations.size() == 1) {
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
return (InternalMin) aggregations.get(0);
|
min = Math.min(min, ((InternalMin) aggregation).min);
|
||||||
}
|
}
|
||||||
InternalMin reduced = null;
|
return new InternalMin(getName(), min);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = (InternalMin) aggregation;
|
|
||||||
} else {
|
|
||||||
reduced.min = Math.min(reduced.min, ((InternalMin) aggregation).min);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (reduced != null) {
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
return (InternalMin) aggregations.get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -32,7 +32,6 @@ import org.elasticsearch.search.aggregations.support.format.ValueFormatterStream
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -89,17 +88,15 @@ public class InternalPercentiles extends InternalNumericMetricsAggregation.Multi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalPercentiles reduce(ReduceContext reduceContext) {
|
public InternalPercentiles reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
TDigestState merged = null;
|
||||||
InternalPercentiles merged = null;
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
|
final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
|
||||||
if (merged == null) {
|
if (merged == null) {
|
||||||
merged = percentiles;
|
merged = new TDigestState(percentiles.state.compression());
|
||||||
} else {
|
|
||||||
merged.state.add(percentiles.state);
|
|
||||||
}
|
}
|
||||||
|
merged.add(percentiles.state);
|
||||||
}
|
}
|
||||||
return merged;
|
return new InternalPercentiles(getName(), percents, merged, keyed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -36,6 +36,10 @@ public class TDigestState extends AVLTreeDigest {
|
|||||||
this.compression = compression;
|
this.compression = compression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public double compression() {
|
||||||
|
return compression;
|
||||||
|
}
|
||||||
|
|
||||||
public static void write(TDigestState state, StreamOutput out) throws IOException {
|
public static void write(TDigestState state, StreamOutput out) throws IOException {
|
||||||
out.writeDouble(state.compression);
|
out.writeDouble(state.compression);
|
||||||
out.writeVInt(state.centroidCount());
|
out.writeVInt(state.centroidCount());
|
||||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -120,33 +119,18 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalStats reduce(ReduceContext reduceContext) {
|
public InternalStats reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
long count = 0;
|
||||||
if (aggregations.size() == 1) {
|
double min = Double.POSITIVE_INFINITY;
|
||||||
return (InternalStats) aggregations.get(0);
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
|
double sum = 0;
|
||||||
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
|
InternalStats stats = (InternalStats) aggregation;
|
||||||
|
count += stats.getCount();
|
||||||
|
min = Math.min(min, stats.getMin());
|
||||||
|
max = Math.max(max, stats.getMax());
|
||||||
|
sum += stats.getSum();
|
||||||
}
|
}
|
||||||
InternalStats reduced = null;
|
return new InternalStats(name, count, sum, min, max);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
if (((InternalStats) aggregation).count != 0) {
|
|
||||||
reduced = (InternalStats) aggregation;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (((InternalStats) aggregation).count != 0) {
|
|
||||||
reduced.count += ((InternalStats) aggregation).count;
|
|
||||||
reduced.min = Math.min(reduced.min, ((InternalStats) aggregation).min);
|
|
||||||
reduced.max = Math.max(reduced.max, ((InternalStats) aggregation).max);
|
|
||||||
reduced.sum += ((InternalStats) aggregation).sum;
|
|
||||||
mergeOtherStats(reduced, aggregation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (reduced != null) {
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
return (InternalStats) aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -101,8 +101,14 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void mergeOtherStats(InternalStats to, InternalAggregation from) {
|
public InternalExtendedStats reduce(ReduceContext reduceContext) {
|
||||||
((InternalExtendedStats) to).sumOfSqrs += ((InternalExtendedStats) from).sumOfSqrs;
|
double sumOfSqrs = 0;
|
||||||
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
|
InternalExtendedStats stats = (InternalExtendedStats) aggregation;
|
||||||
|
sumOfSqrs += stats.getSumOfSquares();
|
||||||
|
}
|
||||||
|
final InternalStats stats = super.reduce(reduceContext);
|
||||||
|
return new InternalExtendedStats(name, stats.getCount(), stats.getSum(), stats.getMin(), stats.getMax(), sumOfSqrs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
|||||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -74,22 +73,11 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalSum reduce(ReduceContext reduceContext) {
|
public InternalSum reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
double sum = 0;
|
||||||
if (aggregations.size() == 1) {
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
return (InternalSum) aggregations.get(0);
|
sum += ((InternalSum) aggregation).sum;
|
||||||
}
|
}
|
||||||
InternalSum reduced = null;
|
return new InternalSum(name, sum);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = (InternalSum) aggregation;
|
|
||||||
} else {
|
|
||||||
reduced.sum += ((InternalSum) aggregation).sum;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (reduced != null) {
|
|
||||||
return reduced;
|
|
||||||
}
|
|
||||||
return (InternalSum) aggregations.get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -94,10 +94,6 @@ public class InternalTopHits extends InternalMetricsAggregation implements TopHi
|
|||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
||||||
if (aggregations.size() == 1 && from == 0) {
|
|
||||||
return aggregations.get(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
TopDocs[] shardDocs = new TopDocs[aggregations.size()];
|
TopDocs[] shardDocs = new TopDocs[aggregations.size()];
|
||||||
InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()];
|
InternalSearchHits[] shardHits = new InternalSearchHits[aggregations.size()];
|
||||||
for (int i = 0; i < shardDocs.length; i++) {
|
for (int i = 0; i < shardDocs.length; i++) {
|
||||||
|
@ -26,7 +26,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
|
|||||||
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An internal implementation of {@link ValueCount}.
|
* An internal implementation of {@link ValueCount}.
|
||||||
@ -69,19 +68,11 @@ public class InternalValueCount extends InternalNumericMetricsAggregation implem
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalAggregation reduce(ReduceContext reduceContext) {
|
public InternalAggregation reduce(ReduceContext reduceContext) {
|
||||||
List<InternalAggregation> aggregations = reduceContext.aggregations();
|
long valueCount = 0;
|
||||||
if (aggregations.size() == 1) {
|
for (InternalAggregation aggregation : reduceContext.aggregations()) {
|
||||||
return aggregations.get(0);
|
valueCount += ((InternalValueCount) aggregation).value;
|
||||||
}
|
}
|
||||||
InternalValueCount reduced = null;
|
return new InternalValueCount(name, valueCount);
|
||||||
for (InternalAggregation aggregation : aggregations) {
|
|
||||||
if (reduced == null) {
|
|
||||||
reduced = (InternalValueCount) aggregation;
|
|
||||||
} else {
|
|
||||||
reduced.value += ((InternalValueCount) aggregation).value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return reduced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range;
|
|||||||
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
|
import org.elasticsearch.search.aggregations.bucket.range.RangeBuilder;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||||
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
|
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -310,4 +311,49 @@ public class RandomTests extends ElasticsearchIntegrationTest {
|
|||||||
assertEquals(numDocs, response.getHits().getTotalHits());
|
assertEquals(numDocs, response.getHits().getTotalHits());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://github.com/elasticsearch/elasticsearch/issues/6435
|
||||||
|
public void testReduce() throws Exception {
|
||||||
|
createIndex("idx");
|
||||||
|
final int value = randomIntBetween(0, 10);
|
||||||
|
indexRandom(true, client().prepareIndex("idx", "type").setSource("f", value));
|
||||||
|
|
||||||
|
SearchResponse response = client().prepareSearch("idx")
|
||||||
|
.addAggregation(filter("filter").filter(FilterBuilders.matchAllFilter())
|
||||||
|
.subAggregation(range("range")
|
||||||
|
.field("f")
|
||||||
|
.addUnboundedTo(6)
|
||||||
|
.addUnboundedFrom(6)
|
||||||
|
.subAggregation(sum("sum").field("f"))))
|
||||||
|
.execute().actionGet();
|
||||||
|
|
||||||
|
assertSearchResponse(response);System.out.println(response);
|
||||||
|
|
||||||
|
Filter filter = response.getAggregations().get("filter");
|
||||||
|
assertNotNull(filter);
|
||||||
|
assertEquals(1, filter.getDocCount());
|
||||||
|
|
||||||
|
Range range = filter.getAggregations().get("range");
|
||||||
|
assertThat(range, notNullValue());
|
||||||
|
assertThat(range.getName(), equalTo("range"));
|
||||||
|
assertThat(range.getBuckets().size(), equalTo(2));
|
||||||
|
|
||||||
|
Range.Bucket bucket = range.getBucketByKey("*-6.0");
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKey(), equalTo("*-6.0"));
|
||||||
|
assertThat(bucket.getFrom().doubleValue(), equalTo(Double.NEGATIVE_INFINITY));
|
||||||
|
assertThat(bucket.getTo().doubleValue(), equalTo(6.0));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(value < 6 ? 1L : 0L));
|
||||||
|
Sum sum = bucket.getAggregations().get("sum");
|
||||||
|
assertEquals(value < 6 ? value : 0, sum.getValue(), 0d);
|
||||||
|
|
||||||
|
bucket = range.getBucketByKey("6.0-*");
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKey(), equalTo("6.0-*"));
|
||||||
|
assertThat(bucket.getFrom().doubleValue(), equalTo(6.0));
|
||||||
|
assertThat(bucket.getTo().doubleValue(), equalTo(Double.POSITIVE_INFINITY));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(value >= 6 ? 1L : 0L));
|
||||||
|
sum = bucket.getAggregations().get("sum");
|
||||||
|
assertEquals(value >= 6 ? value : 0, sum.getValue(), 0d);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user