diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index 69381b93e49..41058ee2066 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -130,15 +131,25 @@ public abstract class Aggregator extends BucketCollector implements Releasable { } /** - * Validates the "key" portion of a sort on this aggregation. + * Builds a comparator that compares two buckets aggregated by this {@linkplain Aggregator}. *

* The default implementation throws an exception but we override it on aggregations that support sorting. */ - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " + "that is built out of zero or more single-bucket aggregations within the path and a final " + "single-bucket or a metrics aggregation at the path end."); } + /** + * Compare two buckets by their ordinal. + */ + @FunctionalInterface + public interface BucketComparator { + /** + * Compare two buckets by their ordinal. + */ + int compare(long lhs, long rhs); + } /** * Build an aggregation for data that has been collected into {@code bucket}. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java index d1e57e12321..10048b19129 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java @@ -29,12 +29,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.function.ToLongFunction; /** - * {@link Bucket} Ordering strategy. + * {@link Bucket} ordering strategy. Buckets can be order either as + * "complete" buckets using {@link #comparator()} or against a combination + * of the buckets internals with its ordinal with + * {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}. */ public abstract class BucketOrder implements ToXContentObject, Writeable { - /** * Creates a bucket ordering strategy that sorts buckets by their document counts (ascending or descending). * @@ -98,16 +101,41 @@ public abstract class BucketOrder implements ToXContentObject, Writeable { } /** - * @return A comparator for the bucket based on the given aggregator. The comparator is used in two phases: - *

- * - aggregation phase, where each shard builds a list of buckets to be sent to the coordinating node. - * In this phase, the passed in aggregator will be the aggregator that aggregates the buckets on the - * shard level. - *

- * - reduce phase, where the coordinating node gathers all the buckets from all the shards and reduces them - * to a final bucket list. In this case, the passed in aggregator will be {@code null}. + * Validate an aggregation against an {@linkplain Aggregator}. + * @throws AggregationExecutionException when the ordering is invalid + * for this {@linkplain Aggregator}. */ - public abstract Comparator comparator(Aggregator aggregator); + public final void validate(Aggregator aggregator) throws AggregationExecutionException{ + /* + * Building partiallyBuiltBucketComparator and throwing it away is enough + * to validate this order because doing so checks all of the appropriate + * paths. + */ + partiallyBuiltBucketComparator(null, aggregator); + } + + /** + * A builds comparator comparing buckets partially built buckets by + * delegating comparison of the results of any "child" aggregations to + * the provided {@linkplain Aggregator}. + *

+ * Warning: This is fairly difficult to use and impossible to use cleanly. + * In addition, this exists primarily to return the "top n" buckets based + * on the results of a sub aggregation. The trouble is that could end up + * throwing away buckets on the data nodes that should ultimately be kept + * after reducing all of the results. If you know that this is coming it + * is fine, but most folks that use "generic" sorts don't. In other words: + * before you use this method think super duper hard if you want to have + * these kinds of issues. The terms agg does an folks get into trouble + * with it all the time. + *

+ */ + public abstract Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator); + + /** + * Build a comparator for fully built buckets. + */ + public abstract Comparator comparator(); /** * @return unique internal ID used for reading/writing this order from/to a stream. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java index 2cee6eb729f..e2d3303e4cd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java @@ -28,9 +28,10 @@ import org.elasticsearch.common.util.Comparators; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.ArrayList; @@ -40,72 +41,22 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.function.ToLongFunction; + +import static java.util.stream.Collectors.toList; /** * Implementations for {@link Bucket} ordering strategies. */ -public class InternalOrder extends BucketOrder { - - private final byte id; - private final String key; - protected final boolean asc; - protected final Comparator comparator; - - /** - * Creates an ordering strategy that sorts {@link Bucket}s by some property. - * - * @param id unique ID for this ordering strategy. - * @param key key of the property to sort on. - * @param asc direction to sort by: {@code true} for ascending, {@code false} for descending. - * @param comparator determines how buckets will be ordered. - */ - public InternalOrder(byte id, String key, boolean asc, Comparator comparator) { - this.id = id; - this.key = key; - this.asc = asc; - this.comparator = comparator; - } - - @Override - byte id() { - return id; - } - - @Override - public Comparator comparator(Aggregator aggregator) { - return comparator; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject().field(key, asc ? "asc" : "desc").endObject(); - } - - /** - * Validate a bucket ordering strategy for an {@link Aggregator}. - * - * @param order bucket ordering strategy to sort on. - * @param aggregator aggregator to sort. - * @return unmodified bucket ordering strategy. - * @throws AggregationExecutionException if validation fails - */ - public static BucketOrder validate(BucketOrder order, Aggregator aggregator) throws AggregationExecutionException { - if (order instanceof CompoundOrder) { - for (BucketOrder innerOrder : ((CompoundOrder) order).orderElements) { - validate(innerOrder, aggregator); - } - } else if (order instanceof Aggregation) { - ((Aggregation) order).path().validate(aggregator); - } - return order; - } - +public abstract class InternalOrder extends BucketOrder { + // TODO merge the contents of this file into BucketOrder. The way it is now is relic. /** * {@link Bucket} ordering strategy to sort by a sub-aggregation. */ public static class Aggregation extends InternalOrder { - static final byte ID = 0; + private final SortOrder order; + private final AggregationPath path; /** * Create a new ordering strategy to sort by a sub-aggregation. @@ -115,51 +66,56 @@ public class InternalOrder extends BucketOrder { * @see AggregationPath */ Aggregation(String path, boolean asc) { - super(ID, path, asc, new AggregationComparator(path, asc)); + order = asc ? SortOrder.ASC : SortOrder.DESC; + this.path = AggregationPath.parse(path); } - /** - * @return parsed path to the sub-aggregation to sort on. - */ public AggregationPath path() { - return ((AggregationComparator) comparator).path; + return path; } @Override - public Comparator comparator(Aggregator aggregator) { - if (aggregator instanceof TermsAggregator) { - // Internal Optimization for terms aggregation to avoid constructing buckets for ordering purposes - return ((TermsAggregator) aggregator).bucketComparator(path(), asc); + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + try { + BucketComparator bucketComparator = path.bucketComparator(aggregator, order); + return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs)); + } catch (IllegalArgumentException e) { + throw new AggregationExecutionException("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e); } - return comparator; } - /** - * {@link Bucket} ordering strategy to sort by a sub-aggregation. - */ - static class AggregationComparator implements Comparator { + @Override + public Comparator comparator() { + return (lhs, rhs) -> { + double l = path.resolveValue(((InternalAggregations) lhs.getAggregations())); + double r = path.resolveValue(((InternalAggregations) rhs.getAggregations())); + return Comparators.compareDiscardNaN(l, r, order == SortOrder.ASC); + }; + } - private final AggregationPath path; - private final boolean asc; + @Override + byte id() { + return ID; + } - /** - * Create a new {@link Bucket} ordering strategy to sort by a sub-aggregation. - * - * @param path path to the sub-aggregation to sort on. - * @param asc direction to sort by: {@code true} for ascending, {@code false} for descending. - * @see AggregationPath - */ - AggregationComparator(String path, boolean asc) { - this.asc = asc; - this.path = AggregationPath.parse(path); - } - - @Override - public int compare(Bucket b1, Bucket b2) { - double v1 = path.resolveValue(((InternalAggregations) b1.getAggregations())); - double v2 = path.resolveValue(((InternalAggregations) b2.getAggregations())); - return Comparators.compareDiscardNaN(v1, v2, asc); + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(path.toString(), order.toString()).endObject(); + } + + @Override + public int hashCode() { + return Objects.hash(path, order); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; } + Aggregation other = (Aggregation) obj; + return Objects.equals(path, other.path) + && Objects.equals(order, other.order); } } @@ -227,8 +183,31 @@ public class InternalOrder extends BucketOrder { } @Override - public Comparator comparator(Aggregator aggregator) { - return new CompoundOrderComparator(orderElements, aggregator); + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + List> comparators = orderElements.stream() + .map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator)) + .collect(toList()); + return (lhs, rhs) -> { + Iterator> itr = comparators.iterator(); + int result; + do { + result = itr.next().compare(lhs, rhs); + } while (result == 0 && itr.hasNext()); + return result; + }; + } + + @Override + public Comparator comparator() { + List> comparators = orderElements.stream().map(BucketOrder::comparator).collect(toList()); + return (lhs, rhs) -> { + Iterator> itr = comparators.iterator(); + int result; + do { + result = itr.next().compare(lhs, rhs); + } while (result == 0 && itr.hasNext()); + return result; + }; } @Override @@ -247,34 +226,64 @@ public class InternalOrder extends BucketOrder { CompoundOrder other = (CompoundOrder) obj; return Objects.equals(orderElements, other.orderElements); } + } - /** - * {@code Comparator} for sorting buckets by multiple criteria. - */ - static class CompoundOrderComparator implements Comparator { + /** + * {@link BucketOrder} implementation for simple, fixed orders like + * {@link InternalOrder#COUNT_ASC}. Complex implementations should not + * use this. + */ + private static class SimpleOrder extends InternalOrder { + private final byte id; + private final String key; + private final SortOrder order; + private final Comparator comparator; - private List compoundOrder; - private Aggregator aggregator; + SimpleOrder(byte id, String key, SortOrder order, Comparator comparator) { + this.id = id; + this.key = key; + this.order = order; + this.comparator = comparator; + } - /** - * Create a new {@code Comparator} for sorting buckets by multiple criteria. - * - * @param compoundOrder a list of {@link BucketOrder}s to sort on, in order of priority. - * @param aggregator {@link BucketOrder#comparator(Aggregator)} - */ - CompoundOrderComparator(List compoundOrder, Aggregator aggregator) { - this.compoundOrder = compoundOrder; - this.aggregator = aggregator; + @Override + public Comparator comparator() { + return comparator; + } + + @Override + byte id() { + return id; + } + + @Override + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + Comparator comparator = comparator(); + return (lhs, rhs) -> comparator.compare(lhs, rhs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(key, order.toString()).endObject(); + } + + @Override + public int hashCode() { + return Objects.hash(id, key, order); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; } - - @Override - public int compare(Bucket b1, Bucket b2) { - int result = 0; - for (Iterator itr = compoundOrder.iterator(); itr.hasNext() && result == 0; ) { - result = itr.next().comparator(aggregator).compare(b1, b2); - } - return result; + if (getClass() != obj.getClass()) { + return false; } + SimpleOrder other = (SimpleOrder) obj; + return Objects.equals(id, other.id) + && Objects.equals(key, other.key) + && Objects.equals(order, other.order); } } @@ -286,22 +295,22 @@ public class InternalOrder extends BucketOrder { /** * Order by the (higher) count of each bucket. */ - static final InternalOrder COUNT_DESC = new InternalOrder(COUNT_DESC_ID, "_count", false, comparingCounts().reversed()); + static final InternalOrder COUNT_DESC = new SimpleOrder(COUNT_DESC_ID, "_count", SortOrder.DESC, comparingCounts().reversed()); /** * Order by the (lower) count of each bucket. */ - static final InternalOrder COUNT_ASC = new InternalOrder(COUNT_ASC_ID, "_count", true, comparingCounts()); + static final InternalOrder COUNT_ASC = new SimpleOrder(COUNT_ASC_ID, "_count", SortOrder.ASC, comparingCounts()); /** * Order by the key of each bucket descending. */ - static final InternalOrder KEY_DESC = new InternalOrder(KEY_DESC_ID, "_key", false, comparingKeys().reversed()); + static final InternalOrder KEY_DESC = new SimpleOrder(KEY_DESC_ID, "_key", SortOrder.DESC, comparingKeys().reversed()); /** * Order by the key of each bucket ascending. */ - static final InternalOrder KEY_ASC = new InternalOrder(KEY_ASC_ID, "_key", true, comparingKeys()); + static final InternalOrder KEY_ASC = new SimpleOrder(KEY_ASC_ID, "_key", SortOrder.ASC, comparingKeys()); /** * @return compare by {@link Bucket#getDocCount()}. @@ -464,7 +473,7 @@ public class InternalOrder extends BucketOrder { out.writeByte(order.id()); if (order instanceof Aggregation) { Aggregation aggregationOrder = (Aggregation) order; - out.writeBoolean(aggregationOrder.asc); + out.writeBoolean(aggregationOrder.order == SortOrder.ASC); out.writeString(aggregationOrder.path().toString()); } else if (order instanceof CompoundOrder) { CompoundOrder compoundOrder = (CompoundOrder) order; @@ -579,23 +588,4 @@ public class InternalOrder extends BucketOrder { } } } - - @Override - public int hashCode() { - return Objects.hash(id, key, asc); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - InternalOrder other = (InternalOrder) obj; - return Objects.equals(id, other.id) - && Objects.equals(key, other.key) - && Objects.equals(asc, other.asc); - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index e53e2d7885a..f26cd0478a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Arrays; @@ -174,15 +175,12 @@ public abstract class BucketsAggregator extends AggregatorBase { } @Override - public void validateSortPathKey(String key) { - if (false == this instanceof SingleBucketAggregator) { - super.validateSortPathKey(key); - return; - } - if (key != null && false == "doc_count".equals(key)) { - throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " + - "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + - ".doc_count\")"); + public BucketComparator bucketComparator(String key, SortOrder order) { + if (key == null || false == "doc_count".equals(key)) { + return (lhs, rhs) -> order.reverseMul() * Integer.compare(bucketDocCount(lhs), bucketDocCount(rhs)); } + throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " + + "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + + ".doc_count\") or \"key\"."); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index e26037771d0..10dd8ee1bf7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -128,8 +129,8 @@ public abstract class DeferringBucketCollector extends BucketCollector { } @Override - public void validateSortPathKey(String key) { - in.validateSortPathKey(key); + public BucketComparator bucketComparator(String key, SortOrder order) { + throw new UnsupportedOperationException("Can't sort on deferred aggregations"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java index 25d3e1e7188..38148a10333 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java @@ -22,6 +22,4 @@ package org.elasticsearch.search.aggregations.bucket; * A bucket aggregator that doesn't create new buckets. */ public interface SingleBucketAggregator { - - int bucketDocCount(long bucketOrd); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index b10507cd2ce..33a60c8649e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -179,7 +179,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator { // the contract of the histogram aggregation is that shards must return // buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 38d79a3f77e..26ebe6b9191 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -31,7 +31,6 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -74,7 +73,8 @@ class DateHistogramAggregator extends BucketsAggregator { super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); this.rounding = rounding; this.shardRounding = shardRounding; - this.order = InternalOrder.validate(order, this); + this.order = order; + order.validate(this); this.keyed = keyed; this.minDocCount = minDocCount; this.extendedBounds = extendedBounds; @@ -141,7 +141,7 @@ class DateHistogramAggregator extends BucketsAggregator { } // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields // Important: use `rounding` here, not `shardRounding` diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java index 0b1a37dae3b..506afeedbfa 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java @@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -78,7 +77,8 @@ class DateRangeHistogramAggregator extends BucketsAggregator { super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); this.rounding = rounding; this.shardRounding = shardRounding; - this.order = InternalOrder.validate(order, this); + this.order = order; + order.validate(this); this.keyed = keyed; this.minDocCount = minDocCount; this.extendedBounds = extendedBounds; @@ -162,7 +162,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator { } // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields // Important: use `rounding` here, not `shardRounding` diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index b4e0ba659af..cfb0430b6c3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -463,7 +463,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< // nothing to do when sorting by key ascending, as data is already sorted since shards return // sorted buckets and the merge-sort performed by reduceBuckets maintains order. // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + CollectionUtil.introSort(reducedBuckets, order.comparator()); } } return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index a7b16e89499..03c37cfd133 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -436,7 +436,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0); final boolean needsFullScan = bucketOrds == null || bucketCountThresholds.getMinDocCount() == 0; final long maxId = needsFullScan ? valueCount : bucketOrds.size(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java index bc8e8198984..abcbaf7ab0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java @@ -138,7 +138,7 @@ public abstract class InternalMappedRareTerms, addToFilter(filter, b); } } - CollectionUtil.introSort(rare, order.comparator(null)); + CollectionUtil.introSort(rare, order.comparator()); return createWithFilter(name, rare, filter); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 8f45749a363..d8f74b1b8e4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -256,7 +256,7 @@ public abstract class InternalTerms, B extends Int } final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); - final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(null)); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (List sameTermBuckets : buckets.values()) { final B b = reduceBucket(sameTermBuckets, reduceContext); if (sumDocCountError == -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index 7c79e2db6b8..86546286580 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -150,7 +150,7 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); LongTerms.Bucket spare = null; for (long i = 0; i < bucketOrds.size(); i++) { if (spare == null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 29bb46140ec..808f2ae71bf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -155,7 +155,7 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); StringTerms.Bucket spare = null; for (int i = 0; i < bucketOrds.size(); i++) { if (spare == null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index abdd36b3d27..f67c19d508a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -24,26 +24,19 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.Comparators; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.Aggregation; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator; -import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; import java.io.IOException; import java.util.Comparator; @@ -181,6 +174,7 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; + protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting = new HashSet<>(); protected final SubAggCollectionMode collectMode; @@ -189,7 +183,8 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { List pipelineAggregators, Map metaData) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); this.bucketCountThresholds = bucketCountThresholds; - this.order = InternalOrder.validate(order, this); + this.order = order; + partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); this.format = format; if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) { /** @@ -235,67 +230,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { return false; } - /** - * Internal Optimization for ordering {@link InternalTerms.Bucket}s by a sub aggregation. - *

- * in this phase, if the order is based on sub-aggregations, we need to use a different comparator - * to avoid constructing buckets for ordering purposes (we can potentially have a lot of buckets and building - * them will cause loads of redundant object constructions). The "special" comparators here will fetch the - * sub aggregation values directly from the sub aggregators bypassing bucket creation. Note that the comparator - * attached to the order will still be used in the reduce phase of the Aggregation. - * - * @param path determines which sub aggregation to use for ordering. - * @param asc {@code true} for ascending order, {@code false} for descending. - * @return {@code Comparator} to order {@link InternalTerms.Bucket}s in the desired order. - */ - public Comparator bucketComparator(AggregationPath path, boolean asc) { - - Aggregator agg = path.resolveAggregator(this); - // TODO Move this method into Aggregator or AggregationPath. - if (agg instanceof ProfilingAggregator) { - agg = ProfilingAggregator.unwrap(agg); - } - final Aggregator aggregator = agg; - final String key = path.lastPathElement().key; - - if (aggregator instanceof SingleBucketAggregator) { - assert key == null : "this should be picked up before the aggregation is executed - on validate"; - return (b1, b2) -> { - int mul = asc ? 1 : -1; - int v1 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b1).bucketOrd); - int v2 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b2).bucketOrd); - return mul * (v1 - v2); - }; - } - - // with only support single-bucket aggregators - assert !(aggregator instanceof BucketsAggregator) : "this should be picked up before the aggregation is executed - on validate"; - - if (aggregator instanceof NumericMetricsAggregator.MultiValue) { - assert key != null : "this should be picked up before the aggregation is executed - on validate"; - return (b1, b2) -> { - double v1 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b1).bucketOrd); - double v2 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b2).bucketOrd); - // some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to - // the bottom - return Comparators.compareDiscardNaN(v1, v2, asc); - }; - } - - // single-value metrics agg - return (b1, b2) -> { - double v1 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b1).bucketOrd); - double v2 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b2).bucketOrd); - // some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to - // the bottom - return Comparators.compareDiscardNaN(v1, v2, asc); - }; - } - @Override protected boolean shouldDefer(Aggregator aggregator) { return collectMode == SubAggCollectionMode.BREADTH_FIRST && !aggsUsedForSorting.contains(aggregator); } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 340b868f448..d6dea34f35d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -87,18 +87,15 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory metaData) throws IOException { final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData); - return new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) { - { - // even in the case of an unmapped aggregator, validate the - // order - InternalOrder.validate(order, this); - } - + Aggregator agg = new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) { @Override public InternalAggregation buildEmptyAggregation() { return aggregation; } }; + // even in the case of an unmapped aggregator, validate the order + order.validate(agg); + return agg; } private static boolean isAggregationSort(BucketOrder order) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java index 24e95ea20d9..413abf7bcea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.search.aggregations.metrics; +import org.elasticsearch.common.util.Comparators; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.List; @@ -43,11 +45,12 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { public abstract double metric(long owningBucketOrd); @Override - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { if (key != null && false == "value".equals(key)) { throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " + "Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")"); } + return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(lhs), metric(rhs), order == SortOrder.ASC); } } @@ -63,7 +66,7 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { public abstract double metric(String name, long owningBucketOrd); @Override - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { if (key == null) { throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified."); } @@ -71,6 +74,8 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { throw new IllegalArgumentException( "Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]"); } + // TODO it'd be faster replace hasMetric and metric with something that returned a function from long to double. + return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java index 2453a17a367..7e947b20a79 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java @@ -22,10 +22,12 @@ package org.elasticsearch.search.aggregations.support; import org.elasticsearch.common.Strings; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; +import org.elasticsearch.search.sort.SortOrder; import java.util.ArrayList; import java.util.Iterator; @@ -182,11 +184,6 @@ public class AggregationPath { return stringPathElements; } - private AggregationPath subPath(int offset, int length) { - List subTokens = new ArrayList<>(pathElements.subList(offset, offset + length)); - return new AggregationPath(subTokens); - } - /** * Looks up the value of this path against a set of aggregation results. */ @@ -223,18 +220,8 @@ public class AggregationPath { return aggregator; } - /** - * Validates this path over the given aggregator as a point of reference. - * - * @param root The point of reference of this path - * @throws AggregationExecutionException on validation error - */ - public void validate(Aggregator root) throws AggregationExecutionException { - try { - resolveAggregator(root).validateSortPathKey(lastPathElement().key); - } catch (IllegalArgumentException e) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e); - } + public BucketComparator bucketComparator(Aggregator root, SortOrder order) { + return resolveAggregator(root).bucketComparator(lastPathElement().key, order); } private static String[] split(String toSplit, int index, String[] result) { @@ -242,4 +229,18 @@ public class AggregationPath { result[1] = toSplit.substring(index + 1); return result; } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + AggregationPath other = (AggregationPath) obj; + return pathElements.equals(other.pathElements); + } + + @Override + public int hashCode() { + return pathElements.hashCode(); + } } diff --git a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java index b7fbf9a0772..0a770c2baf5 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.Timer; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -78,8 +79,8 @@ public class ProfilingAggregator extends Aggregator { } @Override - public void validateSortPathKey(String key) { - delegate.validateSortPathKey(key); + public BucketComparator bucketComparator(String key, SortOrder order) { + return delegate.bucketComparator(key, order); } @Override