Clean and document sorting with partialy built buckets (backport of #52769) (#52925)

The `terms` aggregation can be sortd by the results of its
sub-aggregations. Because it uses that sorting for filtering to the
top-n it tries not to construct all of the buckets for the child
aggregations. This has its own interesting problem around reduction, but
they aren't super relevant to this change. This change moves that
optimization from the `TermsAggregator` and into the aggregators being
sorted on. This should make it more clear what is going on and it
unifies this optimization with validating the sort.

Finally, this should enable some minor optimizations to save a few
comparisons when sorting multi-valued buckets. I'll get those in a
follow up because they are now *fairly* obvious. They probably won't be
a huge performance improvement, but it'll be nice anyway.
This commit is contained in:
Nik Everett 2020-02-27 17:50:55 -05:00 committed by GitHub
parent a47e404732
commit 407101c39b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 251 additions and 284 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Iterator;
@ -130,15 +131,25 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
}
/**
* Validates the "key" portion of a sort on this aggregation.
* Builds a comparator that compares two buckets aggregated by this {@linkplain Aggregator}.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public void validateSortPathKey(String key) {
public BucketComparator bucketComparator(String key, SortOrder order) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}
/**
* Compare two buckets by their ordinal.
*/
@FunctionalInterface
public interface BucketComparator {
/**
* Compare two buckets by their ordinal.
*/
int compare(long lhs, long rhs);
}
/**
* Build an aggregation for data that has been collected into {@code bucket}.

View File

@ -29,12 +29,15 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.function.ToLongFunction;
/**
* {@link Bucket} Ordering strategy.
* {@link Bucket} ordering strategy. Buckets can be order either as
* "complete" buckets using {@link #comparator()} or against a combination
* of the buckets internals with its ordinal with
* {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}.
*/
public abstract class BucketOrder implements ToXContentObject, Writeable {
/**
* Creates a bucket ordering strategy that sorts buckets by their document counts (ascending or descending).
*
@ -98,16 +101,41 @@ public abstract class BucketOrder implements ToXContentObject, Writeable {
}
/**
* @return A comparator for the bucket based on the given aggregator. The comparator is used in two phases:
* <p>
* - aggregation phase, where each shard builds a list of buckets to be sent to the coordinating node.
* In this phase, the passed in aggregator will be the aggregator that aggregates the buckets on the
* shard level.
* <p>
* - reduce phase, where the coordinating node gathers all the buckets from all the shards and reduces them
* to a final bucket list. In this case, the passed in aggregator will be {@code null}.
* Validate an aggregation against an {@linkplain Aggregator}.
* @throws AggregationExecutionException when the ordering is invalid
* for this {@linkplain Aggregator}.
*/
public abstract Comparator<Bucket> comparator(Aggregator aggregator);
public final void validate(Aggregator aggregator) throws AggregationExecutionException{
/*
* Building partiallyBuiltBucketComparator and throwing it away is enough
* to validate this order because doing so checks all of the appropriate
* paths.
*/
partiallyBuiltBucketComparator(null, aggregator);
}
/**
* A builds comparator comparing buckets partially built buckets by
* delegating comparison of the results of any "child" aggregations to
* the provided {@linkplain Aggregator}.
* <p>
* Warning: This is fairly difficult to use and impossible to use cleanly.
* In addition, this exists primarily to return the "top n" buckets based
* on the results of a sub aggregation. The trouble is that could end up
* throwing away buckets on the data nodes that should ultimately be kept
* after reducing all of the results. If you know that this is coming it
* is fine, but most folks that use "generic" sorts don't. In other words:
* before you use this method think super duper hard if you want to have
* these kinds of issues. The terms agg does an folks get into trouble
* with it all the time.
* </p>
*/
public abstract <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator);
/**
* Build a comparator for fully built buckets.
*/
public abstract Comparator<Bucket> comparator();
/**
* @return unique internal ID used for reading/writing this order from/to a stream.

View File

@ -28,9 +28,10 @@ import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.ArrayList;
@ -40,72 +41,22 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.function.ToLongFunction;
import static java.util.stream.Collectors.toList;
/**
* Implementations for {@link Bucket} ordering strategies.
*/
public class InternalOrder extends BucketOrder {
private final byte id;
private final String key;
protected final boolean asc;
protected final Comparator<Bucket> comparator;
/**
* Creates an ordering strategy that sorts {@link Bucket}s by some property.
*
* @param id unique ID for this ordering strategy.
* @param key key of the property to sort on.
* @param asc direction to sort by: {@code true} for ascending, {@code false} for descending.
* @param comparator determines how buckets will be ordered.
*/
public InternalOrder(byte id, String key, boolean asc, Comparator<Bucket> comparator) {
this.id = id;
this.key = key;
this.asc = asc;
this.comparator = comparator;
}
@Override
byte id() {
return id;
}
@Override
public Comparator<Bucket> comparator(Aggregator aggregator) {
return comparator;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field(key, asc ? "asc" : "desc").endObject();
}
/**
* Validate a bucket ordering strategy for an {@link Aggregator}.
*
* @param order bucket ordering strategy to sort on.
* @param aggregator aggregator to sort.
* @return unmodified bucket ordering strategy.
* @throws AggregationExecutionException if validation fails
*/
public static BucketOrder validate(BucketOrder order, Aggregator aggregator) throws AggregationExecutionException {
if (order instanceof CompoundOrder) {
for (BucketOrder innerOrder : ((CompoundOrder) order).orderElements) {
validate(innerOrder, aggregator);
}
} else if (order instanceof Aggregation) {
((Aggregation) order).path().validate(aggregator);
}
return order;
}
public abstract class InternalOrder extends BucketOrder {
// TODO merge the contents of this file into BucketOrder. The way it is now is relic.
/**
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
*/
public static class Aggregation extends InternalOrder {
static final byte ID = 0;
private final SortOrder order;
private final AggregationPath path;
/**
* Create a new ordering strategy to sort by a sub-aggregation.
@ -115,51 +66,56 @@ public class InternalOrder extends BucketOrder {
* @see AggregationPath
*/
Aggregation(String path, boolean asc) {
super(ID, path, asc, new AggregationComparator(path, asc));
order = asc ? SortOrder.ASC : SortOrder.DESC;
this.path = AggregationPath.parse(path);
}
/**
* @return parsed path to the sub-aggregation to sort on.
*/
public AggregationPath path() {
return ((AggregationComparator) comparator).path;
return path;
}
@Override
public Comparator<Bucket> comparator(Aggregator aggregator) {
if (aggregator instanceof TermsAggregator) {
// Internal Optimization for terms aggregation to avoid constructing buckets for ordering purposes
return ((TermsAggregator) aggregator).bucketComparator(path(), asc);
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
try {
BucketComparator bucketComparator = path.bucketComparator(aggregator, order);
return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs));
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
}
return comparator;
}
/**
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
*/
static class AggregationComparator implements Comparator<Bucket> {
@Override
public Comparator<Bucket> comparator() {
return (lhs, rhs) -> {
double l = path.resolveValue(((InternalAggregations) lhs.getAggregations()));
double r = path.resolveValue(((InternalAggregations) rhs.getAggregations()));
return Comparators.compareDiscardNaN(l, r, order == SortOrder.ASC);
};
}
private final AggregationPath path;
private final boolean asc;
@Override
byte id() {
return ID;
}
/**
* Create a new {@link Bucket} ordering strategy to sort by a sub-aggregation.
*
* @param path path to the sub-aggregation to sort on.
* @param asc direction to sort by: {@code true} for ascending, {@code false} for descending.
* @see AggregationPath
*/
AggregationComparator(String path, boolean asc) {
this.asc = asc;
this.path = AggregationPath.parse(path);
}
@Override
public int compare(Bucket b1, Bucket b2) {
double v1 = path.resolveValue(((InternalAggregations) b1.getAggregations()));
double v2 = path.resolveValue(((InternalAggregations) b2.getAggregations()));
return Comparators.compareDiscardNaN(v1, v2, asc);
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field(path.toString(), order.toString()).endObject();
}
@Override
public int hashCode() {
return Objects.hash(path, order);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Aggregation other = (Aggregation) obj;
return Objects.equals(path, other.path)
&& Objects.equals(order, other.order);
}
}
@ -227,8 +183,31 @@ public class InternalOrder extends BucketOrder {
}
@Override
public Comparator<Bucket> comparator(Aggregator aggregator) {
return new CompoundOrderComparator(orderElements, aggregator);
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
List<Comparator<T>> comparators = orderElements.stream()
.map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator))
.collect(toList());
return (lhs, rhs) -> {
Iterator<Comparator<T>> itr = comparators.iterator();
int result;
do {
result = itr.next().compare(lhs, rhs);
} while (result == 0 && itr.hasNext());
return result;
};
}
@Override
public Comparator<Bucket> comparator() {
List<Comparator<Bucket>> comparators = orderElements.stream().map(BucketOrder::comparator).collect(toList());
return (lhs, rhs) -> {
Iterator<Comparator<Bucket>> itr = comparators.iterator();
int result;
do {
result = itr.next().compare(lhs, rhs);
} while (result == 0 && itr.hasNext());
return result;
};
}
@Override
@ -247,34 +226,64 @@ public class InternalOrder extends BucketOrder {
CompoundOrder other = (CompoundOrder) obj;
return Objects.equals(orderElements, other.orderElements);
}
}
/**
* {@code Comparator} for sorting buckets by multiple criteria.
*/
static class CompoundOrderComparator implements Comparator<Bucket> {
/**
* {@link BucketOrder} implementation for simple, fixed orders like
* {@link InternalOrder#COUNT_ASC}. Complex implementations should not
* use this.
*/
private static class SimpleOrder extends InternalOrder {
private final byte id;
private final String key;
private final SortOrder order;
private final Comparator<Bucket> comparator;
private List<BucketOrder> compoundOrder;
private Aggregator aggregator;
SimpleOrder(byte id, String key, SortOrder order, Comparator<Bucket> comparator) {
this.id = id;
this.key = key;
this.order = order;
this.comparator = comparator;
}
/**
* Create a new {@code Comparator} for sorting buckets by multiple criteria.
*
* @param compoundOrder a list of {@link BucketOrder}s to sort on, in order of priority.
* @param aggregator {@link BucketOrder#comparator(Aggregator)}
*/
CompoundOrderComparator(List<BucketOrder> compoundOrder, Aggregator aggregator) {
this.compoundOrder = compoundOrder;
this.aggregator = aggregator;
@Override
public Comparator<Bucket> comparator() {
return comparator;
}
@Override
byte id() {
return id;
}
@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
Comparator<Bucket> comparator = comparator();
return (lhs, rhs) -> comparator.compare(lhs, rhs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field(key, order.toString()).endObject();
}
@Override
public int hashCode() {
return Objects.hash(id, key, order);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
@Override
public int compare(Bucket b1, Bucket b2) {
int result = 0;
for (Iterator<BucketOrder> itr = compoundOrder.iterator(); itr.hasNext() && result == 0; ) {
result = itr.next().comparator(aggregator).compare(b1, b2);
}
return result;
if (getClass() != obj.getClass()) {
return false;
}
SimpleOrder other = (SimpleOrder) obj;
return Objects.equals(id, other.id)
&& Objects.equals(key, other.key)
&& Objects.equals(order, other.order);
}
}
@ -286,22 +295,22 @@ public class InternalOrder extends BucketOrder {
/**
* Order by the (higher) count of each bucket.
*/
static final InternalOrder COUNT_DESC = new InternalOrder(COUNT_DESC_ID, "_count", false, comparingCounts().reversed());
static final InternalOrder COUNT_DESC = new SimpleOrder(COUNT_DESC_ID, "_count", SortOrder.DESC, comparingCounts().reversed());
/**
* Order by the (lower) count of each bucket.
*/
static final InternalOrder COUNT_ASC = new InternalOrder(COUNT_ASC_ID, "_count", true, comparingCounts());
static final InternalOrder COUNT_ASC = new SimpleOrder(COUNT_ASC_ID, "_count", SortOrder.ASC, comparingCounts());
/**
* Order by the key of each bucket descending.
*/
static final InternalOrder KEY_DESC = new InternalOrder(KEY_DESC_ID, "_key", false, comparingKeys().reversed());
static final InternalOrder KEY_DESC = new SimpleOrder(KEY_DESC_ID, "_key", SortOrder.DESC, comparingKeys().reversed());
/**
* Order by the key of each bucket ascending.
*/
static final InternalOrder KEY_ASC = new InternalOrder(KEY_ASC_ID, "_key", true, comparingKeys());
static final InternalOrder KEY_ASC = new SimpleOrder(KEY_ASC_ID, "_key", SortOrder.ASC, comparingKeys());
/**
* @return compare by {@link Bucket#getDocCount()}.
@ -464,7 +473,7 @@ public class InternalOrder extends BucketOrder {
out.writeByte(order.id());
if (order instanceof Aggregation) {
Aggregation aggregationOrder = (Aggregation) order;
out.writeBoolean(aggregationOrder.asc);
out.writeBoolean(aggregationOrder.order == SortOrder.ASC);
out.writeString(aggregationOrder.path().toString());
} else if (order instanceof CompoundOrder) {
CompoundOrder compoundOrder = (CompoundOrder) order;
@ -579,23 +588,4 @@ public class InternalOrder extends BucketOrder {
}
}
}
@Override
public int hashCode() {
return Objects.hash(id, key, asc);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
InternalOrder other = (InternalOrder) obj;
return Objects.equals(id, other.id)
&& Objects.equals(key, other.key)
&& Objects.equals(asc, other.asc);
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Arrays;
@ -174,15 +175,12 @@ public abstract class BucketsAggregator extends AggregatorBase {
}
@Override
public void validateSortPathKey(String key) {
if (false == this instanceof SingleBucketAggregator) {
super.validateSortPathKey(key);
return;
}
if (key != null && false == "doc_count".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
".doc_count\")");
public BucketComparator bucketComparator(String key, SortOrder order) {
if (key == null || false == "doc_count".equals(key)) {
return (lhs, rhs) -> order.reverseMul() * Integer.compare(bucketDocCount(lhs), bucketDocCount(rhs));
}
throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
".doc_count\") or \"key\".");
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Iterator;
@ -128,8 +129,8 @@ public abstract class DeferringBucketCollector extends BucketCollector {
}
@Override
public void validateSortPathKey(String key) {
in.validateSortPathKey(key);
public BucketComparator bucketComparator(String key, SortOrder order) {
throw new UnsupportedOperationException("Can't sort on deferred aggregations");
}
}

View File

@ -22,6 +22,4 @@ package org.elasticsearch.search.aggregations.bucket;
* A bucket aggregator that doesn't create new buckets.
*/
public interface SingleBucketAggregator {
int bucketDocCount(long bucketOrd);
}

View File

@ -179,7 +179,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
// the contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx,

View File

@ -31,7 +31,6 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -74,7 +73,8 @@ class DateHistogramAggregator extends BucketsAggregator {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.order = InternalOrder.validate(order, this);
this.order = order;
order.validate(this);
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
@ -141,7 +141,7 @@ class DateHistogramAggregator extends BucketsAggregator {
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`

View File

@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -78,7 +77,8 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.order = InternalOrder.validate(order, this);
this.order = order;
order.validate(this);
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
@ -162,7 +162,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`

View File

@ -463,7 +463,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,

View File

@ -436,7 +436,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(),

View File

@ -29,14 +29,13 @@ import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.EmptyBucketInfo;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
@ -76,7 +75,8 @@ class NumericHistogramAggregator extends BucketsAggregator {
}
this.interval = interval;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.order = order;
order.validate(this);
this.keyed = keyed;
this.minDocCount = minDocCount;
this.minBound = minBound;
@ -144,7 +144,7 @@ class NumericHistogramAggregator extends BucketsAggregator {
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {

View File

@ -33,7 +33,6 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
@ -70,7 +69,8 @@ public class RangeHistogramAggregator extends BucketsAggregator {
}
this.interval = interval;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.order = order;
order.validate(this);
this.keyed = keyed;
this.minDocCount = minDocCount;
this.minBound = minBound;
@ -148,7 +148,7 @@ public class RangeHistogramAggregator extends BucketsAggregator {
}
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
InternalHistogram.EmptyBucketInfo emptyBucketInfo = null;
if (minDocCount == 0) {

View File

@ -36,12 +36,12 @@ import org.elasticsearch.index.fielddata.AbstractSortedSetDocValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
@ -182,7 +182,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
long otherDocCount = 0;
BucketPriorityQueue<OrdBucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
BucketPriorityQueue<OrdBucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
final boolean needsFullScan = bucketOrds == null || bucketCountThresholds.getMinDocCount() == 0;
final long maxId = needsFullScan ? valueCount : bucketOrds.size();

View File

@ -138,7 +138,7 @@ public abstract class InternalMappedRareTerms<A extends InternalRareTerms<A, B>,
addToFilter(filter, b);
}
}
CollectionUtil.introSort(rare, order.comparator(null));
CollectionUtil.introSort(rare, order.comparator());
return createWithFilter(name, rare, filter);
}

View File

@ -256,7 +256,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
}
final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size());
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator(null));
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator());
for (List<B> sameTermBuckets : buckets.values()) {
final B b = reduceBucket(sameTermBuckets, reduceContext);
if (sumDocCountError == -1) {

View File

@ -150,7 +150,7 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesS
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
CollectionUtil.introSort(buckets, ORDER.comparator(this));
CollectionUtil.introSort(buckets, ORDER.comparator());
return new LongRareTerms(name, ORDER, pipelineAggregators(), metaData(), format, buckets, maxDocCount, filter);
}

View File

@ -132,7 +132,7 @@ public class LongTermsAggregator extends TermsAggregator {
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long otherDocCount = 0;
BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
LongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {

View File

@ -155,7 +155,7 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator<Value
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
}
CollectionUtil.introSort(buckets, ORDER.comparator(this));
CollectionUtil.introSort(buckets, ORDER.comparator());
return new StringRareTerms(name, ORDER, pipelineAggregators(), metaData(), format, buckets, maxDocCount, filter);
}

View File

@ -138,7 +138,7 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long otherDocCount = 0;
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator);
StringTerms.Bucket spare = null;
for (int i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {

View File

@ -24,26 +24,19 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.InternalOrder.Aggregation;
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.aggregation.ProfilingAggregator;
import java.io.IOException;
import java.util.Comparator;
@ -181,6 +174,7 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
protected final DocValueFormat format;
protected final BucketCountThresholds bucketCountThresholds;
protected final BucketOrder order;
protected final Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator;
protected final Set<Aggregator> aggsUsedForSorting = new HashSet<>();
protected final SubAggCollectionMode collectMode;
@ -189,7 +183,8 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.bucketCountThresholds = bucketCountThresholds;
this.order = InternalOrder.validate(order, this);
this.order = order;
partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this);
this.format = format;
if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) {
/**
@ -235,67 +230,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
return false;
}
/**
* Internal Optimization for ordering {@link InternalTerms.Bucket}s by a sub aggregation.
* <p>
* in this phase, if the order is based on sub-aggregations, we need to use a different comparator
* to avoid constructing buckets for ordering purposes (we can potentially have a lot of buckets and building
* them will cause loads of redundant object constructions). The "special" comparators here will fetch the
* sub aggregation values directly from the sub aggregators bypassing bucket creation. Note that the comparator
* attached to the order will still be used in the reduce phase of the Aggregation.
*
* @param path determines which sub aggregation to use for ordering.
* @param asc {@code true} for ascending order, {@code false} for descending.
* @return {@code Comparator} to order {@link InternalTerms.Bucket}s in the desired order.
*/
public Comparator<Bucket> bucketComparator(AggregationPath path, boolean asc) {
Aggregator agg = path.resolveAggregator(this);
// TODO Move this method into Aggregator or AggregationPath.
if (agg instanceof ProfilingAggregator) {
agg = ProfilingAggregator.unwrap(agg);
}
final Aggregator aggregator = agg;
final String key = path.lastPathElement().key;
if (aggregator instanceof SingleBucketAggregator) {
assert key == null : "this should be picked up before the aggregation is executed - on validate";
return (b1, b2) -> {
int mul = asc ? 1 : -1;
int v1 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b1).bucketOrd);
int v2 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b2).bucketOrd);
return mul * (v1 - v2);
};
}
// with only support single-bucket aggregators
assert !(aggregator instanceof BucketsAggregator) : "this should be picked up before the aggregation is executed - on validate";
if (aggregator instanceof NumericMetricsAggregator.MultiValue) {
assert key != null : "this should be picked up before the aggregation is executed - on validate";
return (b1, b2) -> {
double v1 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b1).bucketOrd);
double v2 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b2).bucketOrd);
// some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to
// the bottom
return Comparators.compareDiscardNaN(v1, v2, asc);
};
}
// single-value metrics agg
return (b1, b2) -> {
double v1 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b1).bucketOrd);
double v2 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b2).bucketOrd);
// some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to
// the bottom
return Comparators.compareDiscardNaN(v1, v2, asc);
};
}
@Override
protected boolean shouldDefer(Aggregator aggregator) {
return collectMode == SubAggCollectionMode.BREADTH_FIRST
&& !aggsUsedForSorting.contains(aggregator);
}
}

View File

@ -87,18 +87,15 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
Map<String, Object> metaData) throws IOException {
final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData);
return new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) {
{
// even in the case of an unmapped aggregator, validate the
// order
InternalOrder.validate(order, this);
}
Aggregator agg = new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) {
@Override
public InternalAggregation buildEmptyAggregation() {
return aggregation;
}
};
// even in the case of an unmapped aggregator, validate the order
order.validate(agg);
return agg;
}
private static boolean isAggregationSort(BucketOrder order) {

View File

@ -18,9 +18,11 @@
*/
package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.util.Comparators;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.List;
@ -43,11 +45,12 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator {
public abstract double metric(long owningBucketOrd);
@Override
public void validateSortPathKey(String key) {
public BucketComparator bucketComparator(String key, SortOrder order) {
if (key != null && false == "value".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")");
}
return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(lhs), metric(rhs), order == SortOrder.ASC);
}
}
@ -63,7 +66,7 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator {
public abstract double metric(String name, long owningBucketOrd);
@Override
public void validateSortPathKey(String key) {
public BucketComparator bucketComparator(String key, SortOrder order) {
if (key == null) {
throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified.");
}
@ -71,6 +74,8 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator {
throw new IllegalArgumentException(
"Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]");
}
// TODO it'd be faster replace hasMetric and metric with something that returned a function from long to double.
return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC);
}
}
}

View File

@ -22,10 +22,12 @@ package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.common.Strings;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.profile.aggregation.ProfilingAggregator;
import org.elasticsearch.search.sort.SortOrder;
import java.util.ArrayList;
import java.util.Iterator;
@ -182,11 +184,6 @@ public class AggregationPath {
return stringPathElements;
}
private AggregationPath subPath(int offset, int length) {
List<PathElement> subTokens = new ArrayList<>(pathElements.subList(offset, offset + length));
return new AggregationPath(subTokens);
}
/**
* Looks up the value of this path against a set of aggregation results.
*/
@ -223,18 +220,8 @@ public class AggregationPath {
return aggregator;
}
/**
* Validates this path over the given aggregator as a point of reference.
*
* @param root The point of reference of this path
* @throws AggregationExecutionException on validation error
*/
public void validate(Aggregator root) throws AggregationExecutionException {
try {
resolveAggregator(root).validateSortPathKey(lastPathElement().key);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
}
public BucketComparator bucketComparator(Aggregator root, SortOrder order) {
return resolveAggregator(root).bucketComparator(lastPathElement().key, order);
}
private static String[] split(String toSplit, int index, String[] result) {
@ -242,4 +229,18 @@ public class AggregationPath {
result[1] = toSplit.substring(index + 1);
return result;
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
AggregationPath other = (AggregationPath) obj;
return pathElements.equals(other.pathElements);
}
@Override
public int hashCode() {
return pathElements.hashCode();
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Timer;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Iterator;
@ -78,8 +79,8 @@ public class ProfilingAggregator extends Aggregator {
}
@Override
public void validateSortPathKey(String key) {
delegate.validateSortPathKey(key);
public BucketComparator bucketComparator(String key, SortOrder order) {
return delegate.bucketComparator(key, order);
}
@Override