From 407101c39b7936c11ddc8470c11c373e0c134b52 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 27 Feb 2020 17:50:55 -0500 Subject: [PATCH] Clean and document sorting with partialy built buckets (backport of #52769) (#52925) The `terms` aggregation can be sortd by the results of its sub-aggregations. Because it uses that sorting for filtering to the top-n it tries not to construct all of the buckets for the child aggregations. This has its own interesting problem around reduction, but they aren't super relevant to this change. This change moves that optimization from the `TermsAggregator` and into the aggregators being sorted on. This should make it more clear what is going on and it unifies this optimization with validating the sort. Finally, this should enable some minor optimizations to save a few comparisons when sorting multi-valued buckets. I'll get those in a follow up because they are now *fairly* obvious. They probably won't be a huge performance improvement, but it'll be nice anyway. --- .../search/aggregations/Aggregator.java | 15 +- .../search/aggregations/BucketOrder.java | 50 +++- .../search/aggregations/InternalOrder.java | 270 +++++++++--------- .../bucket/BucketsAggregator.java | 16 +- .../bucket/DeferringBucketCollector.java | 5 +- .../bucket/SingleBucketAggregator.java | 2 - .../AutoDateHistogramAggregator.java | 2 +- .../histogram/DateHistogramAggregator.java | 6 +- .../DateRangeHistogramAggregator.java | 6 +- .../histogram/InternalDateHistogram.java | 2 +- .../bucket/histogram/InternalHistogram.java | 2 +- .../histogram/NumericHistogramAggregator.java | 8 +- .../histogram/RangeHistogramAggregator.java | 6 +- .../GlobalOrdinalsStringTermsAggregator.java | 4 +- .../bucket/terms/InternalMappedRareTerms.java | 2 +- .../bucket/terms/InternalTerms.java | 2 +- .../bucket/terms/LongRareTermsAggregator.java | 2 +- .../bucket/terms/LongTermsAggregator.java | 2 +- .../terms/StringRareTermsAggregator.java | 2 +- .../bucket/terms/StringTermsAggregator.java | 2 +- .../bucket/terms/TermsAggregator.java | 69 +---- .../bucket/terms/TermsAggregatorFactory.java | 11 +- .../metrics/NumericMetricsAggregator.java | 9 +- .../aggregations/support/AggregationPath.java | 35 +-- .../aggregation/ProfilingAggregator.java | 5 +- 25 files changed, 251 insertions(+), 284 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index 69381b93e49..41058ee2066 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -130,15 +131,25 @@ public abstract class Aggregator extends BucketCollector implements Releasable { } /** - * Validates the "key" portion of a sort on this aggregation. + * Builds a comparator that compares two buckets aggregated by this {@linkplain Aggregator}. *

* The default implementation throws an exception but we override it on aggregations that support sorting. */ - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " + "that is built out of zero or more single-bucket aggregations within the path and a final " + "single-bucket or a metrics aggregation at the path end."); } + /** + * Compare two buckets by their ordinal. + */ + @FunctionalInterface + public interface BucketComparator { + /** + * Compare two buckets by their ordinal. + */ + int compare(long lhs, long rhs); + } /** * Build an aggregation for data that has been collected into {@code bucket}. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java index d1e57e12321..10048b19129 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java @@ -29,12 +29,15 @@ import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.function.ToLongFunction; /** - * {@link Bucket} Ordering strategy. + * {@link Bucket} ordering strategy. Buckets can be order either as + * "complete" buckets using {@link #comparator()} or against a combination + * of the buckets internals with its ordinal with + * {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}. */ public abstract class BucketOrder implements ToXContentObject, Writeable { - /** * Creates a bucket ordering strategy that sorts buckets by their document counts (ascending or descending). * @@ -98,16 +101,41 @@ public abstract class BucketOrder implements ToXContentObject, Writeable { } /** - * @return A comparator for the bucket based on the given aggregator. The comparator is used in two phases: - *

- * - aggregation phase, where each shard builds a list of buckets to be sent to the coordinating node. - * In this phase, the passed in aggregator will be the aggregator that aggregates the buckets on the - * shard level. - *

- * - reduce phase, where the coordinating node gathers all the buckets from all the shards and reduces them - * to a final bucket list. In this case, the passed in aggregator will be {@code null}. + * Validate an aggregation against an {@linkplain Aggregator}. + * @throws AggregationExecutionException when the ordering is invalid + * for this {@linkplain Aggregator}. */ - public abstract Comparator comparator(Aggregator aggregator); + public final void validate(Aggregator aggregator) throws AggregationExecutionException{ + /* + * Building partiallyBuiltBucketComparator and throwing it away is enough + * to validate this order because doing so checks all of the appropriate + * paths. + */ + partiallyBuiltBucketComparator(null, aggregator); + } + + /** + * A builds comparator comparing buckets partially built buckets by + * delegating comparison of the results of any "child" aggregations to + * the provided {@linkplain Aggregator}. + *

+ * Warning: This is fairly difficult to use and impossible to use cleanly. + * In addition, this exists primarily to return the "top n" buckets based + * on the results of a sub aggregation. The trouble is that could end up + * throwing away buckets on the data nodes that should ultimately be kept + * after reducing all of the results. If you know that this is coming it + * is fine, but most folks that use "generic" sorts don't. In other words: + * before you use this method think super duper hard if you want to have + * these kinds of issues. The terms agg does an folks get into trouble + * with it all the time. + *

+ */ + public abstract Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator); + + /** + * Build a comparator for fully built buckets. + */ + public abstract Comparator comparator(); /** * @return unique internal ID used for reading/writing this order from/to a stream. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java index 2cee6eb729f..e2d3303e4cd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java @@ -28,9 +28,10 @@ import org.elasticsearch.common.util.Comparators; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.ArrayList; @@ -40,72 +41,22 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.function.ToLongFunction; + +import static java.util.stream.Collectors.toList; /** * Implementations for {@link Bucket} ordering strategies. */ -public class InternalOrder extends BucketOrder { - - private final byte id; - private final String key; - protected final boolean asc; - protected final Comparator comparator; - - /** - * Creates an ordering strategy that sorts {@link Bucket}s by some property. - * - * @param id unique ID for this ordering strategy. - * @param key key of the property to sort on. - * @param asc direction to sort by: {@code true} for ascending, {@code false} for descending. - * @param comparator determines how buckets will be ordered. - */ - public InternalOrder(byte id, String key, boolean asc, Comparator comparator) { - this.id = id; - this.key = key; - this.asc = asc; - this.comparator = comparator; - } - - @Override - byte id() { - return id; - } - - @Override - public Comparator comparator(Aggregator aggregator) { - return comparator; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject().field(key, asc ? "asc" : "desc").endObject(); - } - - /** - * Validate a bucket ordering strategy for an {@link Aggregator}. - * - * @param order bucket ordering strategy to sort on. - * @param aggregator aggregator to sort. - * @return unmodified bucket ordering strategy. - * @throws AggregationExecutionException if validation fails - */ - public static BucketOrder validate(BucketOrder order, Aggregator aggregator) throws AggregationExecutionException { - if (order instanceof CompoundOrder) { - for (BucketOrder innerOrder : ((CompoundOrder) order).orderElements) { - validate(innerOrder, aggregator); - } - } else if (order instanceof Aggregation) { - ((Aggregation) order).path().validate(aggregator); - } - return order; - } - +public abstract class InternalOrder extends BucketOrder { + // TODO merge the contents of this file into BucketOrder. The way it is now is relic. /** * {@link Bucket} ordering strategy to sort by a sub-aggregation. */ public static class Aggregation extends InternalOrder { - static final byte ID = 0; + private final SortOrder order; + private final AggregationPath path; /** * Create a new ordering strategy to sort by a sub-aggregation. @@ -115,51 +66,56 @@ public class InternalOrder extends BucketOrder { * @see AggregationPath */ Aggregation(String path, boolean asc) { - super(ID, path, asc, new AggregationComparator(path, asc)); + order = asc ? SortOrder.ASC : SortOrder.DESC; + this.path = AggregationPath.parse(path); } - /** - * @return parsed path to the sub-aggregation to sort on. - */ public AggregationPath path() { - return ((AggregationComparator) comparator).path; + return path; } @Override - public Comparator comparator(Aggregator aggregator) { - if (aggregator instanceof TermsAggregator) { - // Internal Optimization for terms aggregation to avoid constructing buckets for ordering purposes - return ((TermsAggregator) aggregator).bucketComparator(path(), asc); + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + try { + BucketComparator bucketComparator = path.bucketComparator(aggregator, order); + return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs)); + } catch (IllegalArgumentException e) { + throw new AggregationExecutionException("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e); } - return comparator; } - /** - * {@link Bucket} ordering strategy to sort by a sub-aggregation. - */ - static class AggregationComparator implements Comparator { + @Override + public Comparator comparator() { + return (lhs, rhs) -> { + double l = path.resolveValue(((InternalAggregations) lhs.getAggregations())); + double r = path.resolveValue(((InternalAggregations) rhs.getAggregations())); + return Comparators.compareDiscardNaN(l, r, order == SortOrder.ASC); + }; + } - private final AggregationPath path; - private final boolean asc; + @Override + byte id() { + return ID; + } - /** - * Create a new {@link Bucket} ordering strategy to sort by a sub-aggregation. - * - * @param path path to the sub-aggregation to sort on. - * @param asc direction to sort by: {@code true} for ascending, {@code false} for descending. - * @see AggregationPath - */ - AggregationComparator(String path, boolean asc) { - this.asc = asc; - this.path = AggregationPath.parse(path); - } - - @Override - public int compare(Bucket b1, Bucket b2) { - double v1 = path.resolveValue(((InternalAggregations) b1.getAggregations())); - double v2 = path.resolveValue(((InternalAggregations) b2.getAggregations())); - return Comparators.compareDiscardNaN(v1, v2, asc); + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(path.toString(), order.toString()).endObject(); + } + + @Override + public int hashCode() { + return Objects.hash(path, order); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; } + Aggregation other = (Aggregation) obj; + return Objects.equals(path, other.path) + && Objects.equals(order, other.order); } } @@ -227,8 +183,31 @@ public class InternalOrder extends BucketOrder { } @Override - public Comparator comparator(Aggregator aggregator) { - return new CompoundOrderComparator(orderElements, aggregator); + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + List> comparators = orderElements.stream() + .map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator)) + .collect(toList()); + return (lhs, rhs) -> { + Iterator> itr = comparators.iterator(); + int result; + do { + result = itr.next().compare(lhs, rhs); + } while (result == 0 && itr.hasNext()); + return result; + }; + } + + @Override + public Comparator comparator() { + List> comparators = orderElements.stream().map(BucketOrder::comparator).collect(toList()); + return (lhs, rhs) -> { + Iterator> itr = comparators.iterator(); + int result; + do { + result = itr.next().compare(lhs, rhs); + } while (result == 0 && itr.hasNext()); + return result; + }; } @Override @@ -247,34 +226,64 @@ public class InternalOrder extends BucketOrder { CompoundOrder other = (CompoundOrder) obj; return Objects.equals(orderElements, other.orderElements); } + } - /** - * {@code Comparator} for sorting buckets by multiple criteria. - */ - static class CompoundOrderComparator implements Comparator { + /** + * {@link BucketOrder} implementation for simple, fixed orders like + * {@link InternalOrder#COUNT_ASC}. Complex implementations should not + * use this. + */ + private static class SimpleOrder extends InternalOrder { + private final byte id; + private final String key; + private final SortOrder order; + private final Comparator comparator; - private List compoundOrder; - private Aggregator aggregator; + SimpleOrder(byte id, String key, SortOrder order, Comparator comparator) { + this.id = id; + this.key = key; + this.order = order; + this.comparator = comparator; + } - /** - * Create a new {@code Comparator} for sorting buckets by multiple criteria. - * - * @param compoundOrder a list of {@link BucketOrder}s to sort on, in order of priority. - * @param aggregator {@link BucketOrder#comparator(Aggregator)} - */ - CompoundOrderComparator(List compoundOrder, Aggregator aggregator) { - this.compoundOrder = compoundOrder; - this.aggregator = aggregator; + @Override + public Comparator comparator() { + return comparator; + } + + @Override + byte id() { + return id; + } + + @Override + public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + Comparator comparator = comparator(); + return (lhs, rhs) -> comparator.compare(lhs, rhs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(key, order.toString()).endObject(); + } + + @Override + public int hashCode() { + return Objects.hash(id, key, order); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; } - - @Override - public int compare(Bucket b1, Bucket b2) { - int result = 0; - for (Iterator itr = compoundOrder.iterator(); itr.hasNext() && result == 0; ) { - result = itr.next().comparator(aggregator).compare(b1, b2); - } - return result; + if (getClass() != obj.getClass()) { + return false; } + SimpleOrder other = (SimpleOrder) obj; + return Objects.equals(id, other.id) + && Objects.equals(key, other.key) + && Objects.equals(order, other.order); } } @@ -286,22 +295,22 @@ public class InternalOrder extends BucketOrder { /** * Order by the (higher) count of each bucket. */ - static final InternalOrder COUNT_DESC = new InternalOrder(COUNT_DESC_ID, "_count", false, comparingCounts().reversed()); + static final InternalOrder COUNT_DESC = new SimpleOrder(COUNT_DESC_ID, "_count", SortOrder.DESC, comparingCounts().reversed()); /** * Order by the (lower) count of each bucket. */ - static final InternalOrder COUNT_ASC = new InternalOrder(COUNT_ASC_ID, "_count", true, comparingCounts()); + static final InternalOrder COUNT_ASC = new SimpleOrder(COUNT_ASC_ID, "_count", SortOrder.ASC, comparingCounts()); /** * Order by the key of each bucket descending. */ - static final InternalOrder KEY_DESC = new InternalOrder(KEY_DESC_ID, "_key", false, comparingKeys().reversed()); + static final InternalOrder KEY_DESC = new SimpleOrder(KEY_DESC_ID, "_key", SortOrder.DESC, comparingKeys().reversed()); /** * Order by the key of each bucket ascending. */ - static final InternalOrder KEY_ASC = new InternalOrder(KEY_ASC_ID, "_key", true, comparingKeys()); + static final InternalOrder KEY_ASC = new SimpleOrder(KEY_ASC_ID, "_key", SortOrder.ASC, comparingKeys()); /** * @return compare by {@link Bucket#getDocCount()}. @@ -464,7 +473,7 @@ public class InternalOrder extends BucketOrder { out.writeByte(order.id()); if (order instanceof Aggregation) { Aggregation aggregationOrder = (Aggregation) order; - out.writeBoolean(aggregationOrder.asc); + out.writeBoolean(aggregationOrder.order == SortOrder.ASC); out.writeString(aggregationOrder.path().toString()); } else if (order instanceof CompoundOrder) { CompoundOrder compoundOrder = (CompoundOrder) order; @@ -579,23 +588,4 @@ public class InternalOrder extends BucketOrder { } } } - - @Override - public int hashCode() { - return Objects.hash(id, key, asc); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - InternalOrder other = (InternalOrder) obj; - return Objects.equals(id, other.id) - && Objects.equals(key, other.key) - && Objects.equals(asc, other.asc); - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index e53e2d7885a..f26cd0478a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Arrays; @@ -174,15 +175,12 @@ public abstract class BucketsAggregator extends AggregatorBase { } @Override - public void validateSortPathKey(String key) { - if (false == this instanceof SingleBucketAggregator) { - super.validateSortPathKey(key); - return; - } - if (key != null && false == "doc_count".equals(key)) { - throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " + - "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + - ".doc_count\")"); + public BucketComparator bucketComparator(String key, SortOrder order) { + if (key == null || false == "doc_count".equals(key)) { + return (lhs, rhs) -> order.reverseMul() * Integer.compare(bucketDocCount(lhs), bucketDocCount(rhs)); } + throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " + + "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + + ".doc_count\") or \"key\"."); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index e26037771d0..10dd8ee1bf7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -128,8 +129,8 @@ public abstract class DeferringBucketCollector extends BucketCollector { } @Override - public void validateSortPathKey(String key) { - in.validateSortPathKey(key); + public BucketComparator bucketComparator(String key, SortOrder order) { + throw new UnsupportedOperationException("Can't sort on deferred aggregations"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java index 25d3e1e7188..38148a10333 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/SingleBucketAggregator.java @@ -22,6 +22,4 @@ package org.elasticsearch.search.aggregations.bucket; * A bucket aggregator that doesn't create new buckets. */ public interface SingleBucketAggregator { - - int bucketDocCount(long bucketOrd); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index b10507cd2ce..33a60c8649e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -179,7 +179,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator { // the contract of the histogram aggregation is that shards must return // buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 38d79a3f77e..26ebe6b9191 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -31,7 +31,6 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -74,7 +73,8 @@ class DateHistogramAggregator extends BucketsAggregator { super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); this.rounding = rounding; this.shardRounding = shardRounding; - this.order = InternalOrder.validate(order, this); + this.order = order; + order.validate(this); this.keyed = keyed; this.minDocCount = minDocCount; this.extendedBounds = extendedBounds; @@ -141,7 +141,7 @@ class DateHistogramAggregator extends BucketsAggregator { } // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields // Important: use `rounding` here, not `shardRounding` diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java index 0b1a37dae3b..506afeedbfa 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java @@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; @@ -78,7 +77,8 @@ class DateRangeHistogramAggregator extends BucketsAggregator { super(name, factories, aggregationContext, parent, pipelineAggregators, metaData); this.rounding = rounding; this.shardRounding = shardRounding; - this.order = InternalOrder.validate(order, this); + this.order = order; + order.validate(this); this.keyed = keyed; this.minDocCount = minDocCount; this.extendedBounds = extendedBounds; @@ -162,7 +162,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator { } // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this)); + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); // value source will be null for unmapped fields // Important: use `rounding` here, not `shardRounding` diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index b4e0ba659af..cfb0430b6c3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -463,7 +463,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< // nothing to do when sorting by key ascending, as data is already sorted since shards return // sorted buckets and the merge-sort performed by reduceBuckets maintains order. // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + CollectionUtil.introSort(reducedBuckets, order.comparator()); } } return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index a7b16e89499..03c37cfd133 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -436,7 +436,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0); final boolean needsFullScan = bucketOrds == null || bucketCountThresholds.getMinDocCount() == 0; final long maxId = needsFullScan ? valueCount : bucketOrds.size(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java index bc8e8198984..abcbaf7ab0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java @@ -138,7 +138,7 @@ public abstract class InternalMappedRareTerms, addToFilter(filter, b); } } - CollectionUtil.introSort(rare, order.comparator(null)); + CollectionUtil.introSort(rare, order.comparator()); return createWithFilter(name, rare, filter); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 8f45749a363..d8f74b1b8e4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -256,7 +256,7 @@ public abstract class InternalTerms, B extends Int } final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); - final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(null)); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (List sameTermBuckets : buckets.values()) { final B b = reduceBucket(sameTermBuckets, reduceContext); if (sumDocCountError == -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index 7c79e2db6b8..86546286580 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -150,7 +150,7 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); LongTerms.Bucket spare = null; for (long i = 0; i < bucketOrds.size(); i++) { if (spare == null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 29bb46140ec..808f2ae71bf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -155,7 +155,7 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator ordered = new BucketPriorityQueue<>(size, order.comparator(this)); + BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); StringTerms.Bucket spare = null; for (int i = 0; i < bucketOrds.size(); i++) { if (spare == null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index abdd36b3d27..f67c19d508a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -24,26 +24,19 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.Comparators; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.InternalOrder.Aggregation; import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator; -import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; import java.io.IOException; import java.util.Comparator; @@ -181,6 +174,7 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; + protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting = new HashSet<>(); protected final SubAggCollectionMode collectMode; @@ -189,7 +183,8 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { List pipelineAggregators, Map metaData) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); this.bucketCountThresholds = bucketCountThresholds; - this.order = InternalOrder.validate(order, this); + this.order = order; + partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); this.format = format; if (subAggsNeedScore() && descendsFromNestedAggregator(parent)) { /** @@ -235,67 +230,9 @@ public abstract class TermsAggregator extends DeferableBucketAggregator { return false; } - /** - * Internal Optimization for ordering {@link InternalTerms.Bucket}s by a sub aggregation. - *

- * in this phase, if the order is based on sub-aggregations, we need to use a different comparator - * to avoid constructing buckets for ordering purposes (we can potentially have a lot of buckets and building - * them will cause loads of redundant object constructions). The "special" comparators here will fetch the - * sub aggregation values directly from the sub aggregators bypassing bucket creation. Note that the comparator - * attached to the order will still be used in the reduce phase of the Aggregation. - * - * @param path determines which sub aggregation to use for ordering. - * @param asc {@code true} for ascending order, {@code false} for descending. - * @return {@code Comparator} to order {@link InternalTerms.Bucket}s in the desired order. - */ - public Comparator bucketComparator(AggregationPath path, boolean asc) { - - Aggregator agg = path.resolveAggregator(this); - // TODO Move this method into Aggregator or AggregationPath. - if (agg instanceof ProfilingAggregator) { - agg = ProfilingAggregator.unwrap(agg); - } - final Aggregator aggregator = agg; - final String key = path.lastPathElement().key; - - if (aggregator instanceof SingleBucketAggregator) { - assert key == null : "this should be picked up before the aggregation is executed - on validate"; - return (b1, b2) -> { - int mul = asc ? 1 : -1; - int v1 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b1).bucketOrd); - int v2 = ((SingleBucketAggregator) aggregator).bucketDocCount(((InternalTerms.Bucket) b2).bucketOrd); - return mul * (v1 - v2); - }; - } - - // with only support single-bucket aggregators - assert !(aggregator instanceof BucketsAggregator) : "this should be picked up before the aggregation is executed - on validate"; - - if (aggregator instanceof NumericMetricsAggregator.MultiValue) { - assert key != null : "this should be picked up before the aggregation is executed - on validate"; - return (b1, b2) -> { - double v1 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b1).bucketOrd); - double v2 = ((NumericMetricsAggregator.MultiValue) aggregator).metric(key, ((InternalTerms.Bucket) b2).bucketOrd); - // some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to - // the bottom - return Comparators.compareDiscardNaN(v1, v2, asc); - }; - } - - // single-value metrics agg - return (b1, b2) -> { - double v1 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b1).bucketOrd); - double v2 = ((NumericMetricsAggregator.SingleValue) aggregator).metric(((InternalTerms.Bucket) b2).bucketOrd); - // some metrics may return NaN (eg. avg, variance, etc...) in which case we'd like to push all of those to - // the bottom - return Comparators.compareDiscardNaN(v1, v2, asc); - }; - } - @Override protected boolean shouldDefer(Aggregator aggregator) { return collectMode == SubAggCollectionMode.BREADTH_FIRST && !aggsUsedForSorting.contains(aggregator); } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 340b868f448..d6dea34f35d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -87,18 +87,15 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory metaData) throws IOException { final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData); - return new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) { - { - // even in the case of an unmapped aggregator, validate the - // order - InternalOrder.validate(order, this); - } - + Aggregator agg = new NonCollectingAggregator(name, searchContext, parent, factories, pipelineAggregators, metaData) { @Override public InternalAggregation buildEmptyAggregation() { return aggregation; } }; + // even in the case of an unmapped aggregator, validate the order + order.validate(agg); + return agg; } private static boolean isAggregationSort(BucketOrder order) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java index 24e95ea20d9..413abf7bcea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.search.aggregations.metrics; +import org.elasticsearch.common.util.Comparators; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.List; @@ -43,11 +45,12 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { public abstract double metric(long owningBucketOrd); @Override - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { if (key != null && false == "value".equals(key)) { throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " + "Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")"); } + return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(lhs), metric(rhs), order == SortOrder.ASC); } } @@ -63,7 +66,7 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { public abstract double metric(String name, long owningBucketOrd); @Override - public void validateSortPathKey(String key) { + public BucketComparator bucketComparator(String key, SortOrder order) { if (key == null) { throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified."); } @@ -71,6 +74,8 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator { throw new IllegalArgumentException( "Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]"); } + // TODO it'd be faster replace hasMetric and metric with something that returned a function from long to double. + return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java index 2453a17a367..7e947b20a79 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java @@ -22,10 +22,12 @@ package org.elasticsearch.search.aggregations.support; import org.elasticsearch.common.Strings; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; +import org.elasticsearch.search.sort.SortOrder; import java.util.ArrayList; import java.util.Iterator; @@ -182,11 +184,6 @@ public class AggregationPath { return stringPathElements; } - private AggregationPath subPath(int offset, int length) { - List subTokens = new ArrayList<>(pathElements.subList(offset, offset + length)); - return new AggregationPath(subTokens); - } - /** * Looks up the value of this path against a set of aggregation results. */ @@ -223,18 +220,8 @@ public class AggregationPath { return aggregator; } - /** - * Validates this path over the given aggregator as a point of reference. - * - * @param root The point of reference of this path - * @throws AggregationExecutionException on validation error - */ - public void validate(Aggregator root) throws AggregationExecutionException { - try { - resolveAggregator(root).validateSortPathKey(lastPathElement().key); - } catch (IllegalArgumentException e) { - throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e); - } + public BucketComparator bucketComparator(Aggregator root, SortOrder order) { + return resolveAggregator(root).bucketComparator(lastPathElement().key, order); } private static String[] split(String toSplit, int index, String[] result) { @@ -242,4 +229,18 @@ public class AggregationPath { result[1] = toSplit.substring(index + 1); return result; } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + AggregationPath other = (AggregationPath) obj; + return pathElements.equals(other.pathElements); + } + + @Override + public int hashCode() { + return pathElements.hashCode(); + } } diff --git a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java index b7fbf9a0772..0a770c2baf5 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java @@ -27,6 +27,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.Timer; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Iterator; @@ -78,8 +79,8 @@ public class ProfilingAggregator extends Aggregator { } @Override - public void validateSortPathKey(String key) { - delegate.validateSortPathKey(key); + public BucketComparator bucketComparator(String key, SortOrder order) { + return delegate.bucketComparator(key, order); } @Override