Continue realizing sorting by aggregations (backport of #52298) (#52667)

This drops more of the `instanceof`s from `AggregationPath`. There are
still a couple in `AggregationPath`. And I ended up moving two into
`BucketsAggregator`, but I think this is still an improvement!
This commit is contained in:
Nik Everett 2020-02-23 17:13:55 -05:00 committed by GitHub
parent a0aa808c83
commit d26d7721ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 140 additions and 95 deletions

View File

@ -28,9 +28,11 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Iterator;
/**
* An Aggregator.
@ -91,6 +93,53 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
*/
public abstract Aggregator subAggregator(String name);
/**
* Resolve the next step of the sort path as though this aggregation
* supported sorting. This is usually the "first step" when resolving
* a sort path because most aggs that support sorting their buckets
* aren't valid in the middle of a sort path.
* <p>
* For example, the {@code terms} aggs supports sorting its buckets, but
* that sort path itself can't contain a different {@code terms}
* aggregation.
*/
public final Aggregator resolveSortPathOnValidAgg(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
Aggregator n = subAggregator(next.name);
if (n == null) {
throw new IllegalArgumentException("The provided aggregation [" + next + "] either does not exist, or is "
+ "a pipeline aggregation and cannot be used to sort the buckets.");
}
if (false == path.hasNext()) {
return n;
}
if (next.key != null) {
throw new IllegalArgumentException("Key only allowed on last aggregation path element but got [" + next + "]");
}
return n.resolveSortPath(path.next(), path);
}
/**
* Resolve a sort path to the target.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. [" + name() + "] is not single-bucket.");
}
/**
* Validates the "key" portion of a sort on this aggregation.
* <p>
* The default implementation throws an exception but we override it on aggregations that support sorting.
*/
public void validateSortPathKey(String key) {
throw new IllegalArgumentException("Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}
/**
* Build an aggregation for data that has been collected into {@code bucket}.
*/

View File

@ -28,10 +28,12 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntConsumer;
@ -163,4 +165,24 @@ public abstract class BucketsAggregator extends AggregatorBase {
}
}
@Override
public Aggregator resolveSortPath(AggregationPath.PathElement next, Iterator<AggregationPath.PathElement> path) {
if (this instanceof SingleBucketAggregator) {
return resolveSortPathOnValidAgg(next, path);
}
return super.resolveSortPath(next, path);
}
@Override
public void validateSortPathKey(String key) {
if (false == this instanceof SingleBucketAggregator) {
super.validateSortPathKey(key);
return;
}
if (key != null && false == "doc_count".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() +
".doc_count\")");
}
}
}

View File

@ -25,9 +25,11 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Iterator;
/**
* A {@link BucketCollector} that records collected doc IDs and buckets and
@ -120,6 +122,15 @@ public abstract class DeferringBucketCollector extends BucketCollector {
"Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
}
@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return in.resolveSortPath(next, path);
}
@Override
public void validateSortPathKey(String key) {
in.validateSortPathKey(key);
}
}
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.aggregation.ProfilingAggregator;
import java.io.IOException;
import java.util.Comparator;
@ -249,7 +250,12 @@ public abstract class TermsAggregator extends DeferableBucketAggregator {
*/
public Comparator<Bucket> bucketComparator(AggregationPath path, boolean asc) {
final Aggregator aggregator = path.resolveAggregator(this);
Aggregator agg = path.resolveAggregator(this);
// TODO Move this method into Aggregator or AggregationPath.
if (agg instanceof ProfilingAggregator) {
agg = ProfilingAggregator.unwrap(agg);
}
final Aggregator aggregator = agg;
final String key = path.lastPathElement().key;
if (aggregator instanceof SingleBucketAggregator) {

View File

@ -107,7 +107,7 @@ public abstract class InternalNumericMetricsAggregation extends InternalAggregat
@Override
public final double sortValue(String key) {
if (key == null) {
throw new IllegalArgumentException("Missing value key in [" + key+ "] which refers to a multi-value metric aggregation");
throw new IllegalArgumentException("Missing value key in [" + key + "] which refers to a multi-value metric aggregation");
}
return value(key);
}

View File

@ -41,6 +41,14 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator {
}
public abstract double metric(long owningBucketOrd);
@Override
public void validateSortPathKey(String key) {
if (key != null && false == "value".equals(key)) {
throw new IllegalArgumentException("Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + name() + "\") or change it to \"value\" (a la \"" + name() + ".value\")");
}
}
}
public abstract static class MultiValue extends NumericMetricsAggregator {
@ -53,5 +61,16 @@ public abstract class NumericMetricsAggregator extends MetricsAggregator {
public abstract boolean hasMetric(String name);
public abstract double metric(String name, long owningBucketOrd);
@Override
public void validateSortPathKey(String key) {
if (key == null) {
throw new IllegalArgumentException("When ordering on a multi-value metrics aggregation a metric name must be specified.");
}
if (false == hasMetric(key)) {
throw new IllegalArgumentException(
"Unknown metric name [" + key + "] on multi-value metrics aggregation [" + name() + "]");
}
}
}
}

View File

@ -182,7 +182,7 @@ public class AggregationPath {
return stringPathElements;
}
public AggregationPath subPath(int offset, int length) {
private AggregationPath subPath(int offset, int length) {
List<PathElement> subTokens = new ArrayList<>(pathElements.subList(offset, offset + length));
return new AggregationPath(subTokens);
}
@ -196,38 +196,29 @@ public class AggregationPath {
assert path.hasNext();
return aggregations.sortValue(path.next(), path);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid order path [" + this + "]. " + e.getMessage(), e);
throw new IllegalArgumentException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
}
}
/**
* Resolves the aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The aggregator pointed by this path starting from the given aggregator as a point of reference
* Resolves the {@linkplain Aggregator} pointed to by this path against
* the given root {@linkplain Aggregator}.
*/
public Aggregator resolveAggregator(Aggregator root) {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
AggregationPath.PathElement token = pathElements.get(i);
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator && i <= pathElements.size() - 1)
|| (aggregator instanceof NumericMetricsAggregator && i == pathElements.size() - 1) :
"this should be picked up before aggregation execution - on validate";
}
return aggregator;
Iterator<PathElement> path = pathElements.iterator();
assert path.hasNext();
return root.resolveSortPathOnValidAgg(path.next(), path);
}
/**
* Resolves the topmost aggregator pointed by this path using the given root as a point of reference.
*
* @param root The point of reference of this path
* @return The first child aggregator of the root pointed by this path
* Resolves the {@linkplain Aggregator} pointed to by the first element
* of this path against the given root {@linkplain Aggregator}.
*/
public Aggregator resolveTopmostAggregator(Aggregator root) {
AggregationPath.PathElement token = pathElements.get(0);
// TODO both unwrap and subAggregator are only used here!
Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name));
assert (aggregator instanceof SingleBucketAggregator )
assert (aggregator instanceof SingleBucketAggregator)
|| (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate";
return aggregator;
}
@ -239,76 +230,10 @@ public class AggregationPath {
* @throws AggregationExecutionException on validation error
*/
public void validate(Aggregator root) throws AggregationExecutionException {
Aggregator aggregator = root;
for (int i = 0; i < pathElements.size(); i++) {
String name = pathElements.get(i).name;
aggregator = ProfilingAggregator.unwrap(aggregator.subAggregator(name));
if (aggregator == null) {
throw new AggregationExecutionException("Invalid aggregator order path [" + this + "]. The " +
"provided aggregation [" + name + "] either does not exist, or is a pipeline aggregation " +
"and cannot be used to sort the buckets.");
}
if (i < pathElements.size() - 1) {
// we're in the middle of the path, so the aggregator can only be a single-bucket aggregator
if (!(aggregator instanceof SingleBucketAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
if (pathElements.get(i).key != null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a " +
"final single-bucket or a metrics aggregation at the path end. Sub-path [" +
subPath(0, i + 1) + "] points to non single-bucket aggregation");
}
}
}
boolean singleBucket = aggregator instanceof SingleBucketAggregator;
if (!singleBucket && !(aggregator instanceof NumericMetricsAggregator)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Buckets can only be sorted on a sub-aggregator path " +
"that is built out of zero or more single-bucket aggregations within the path and a final " +
"single-bucket or a metrics aggregation at the path end.");
}
AggregationPath.PathElement lastToken = lastPathElement();
if (singleBucket) {
if (lastToken.key != null && !"doc_count".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-bucket aggregation can only be done on its doc_count. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"doc_count\" (a la \"" + lastToken.name +
".doc_count\")");
}
return; // perfectly valid to sort on single-bucket aggregation (will be sored on its doc_count)
}
if (aggregator instanceof NumericMetricsAggregator.SingleValue) {
if (lastToken.key != null && !"value".equals(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Ordering on a single-value metrics aggregation can only be done on its value. " +
"Either drop the key (a la \"" + lastToken.name + "\") or change it to \"value\" (a la \"" + lastToken.name +
".value\")");
}
return; // perfectly valid to sort on single metric aggregation (will be sorted on its associated value)
}
// the aggregator must be of a multi-value metrics type
if (lastToken.key == null) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. When ordering on a multi-value metrics aggregation a metric name must be specified");
}
if (!((NumericMetricsAggregator.MultiValue) aggregator).hasMetric(lastToken.key)) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this +
"]. Unknown metric name [" + lastToken.key + "] on multi-value metrics aggregation [" + lastToken.name + "]");
try {
resolveAggregator(root).validateSortPathKey(lastPathElement().key);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException("Invalid aggregation order path [" + this + "]. " + e.getMessage(), e);
}
}

View File

@ -24,10 +24,12 @@ import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Timer;
import java.io.IOException;
import java.util.Iterator;
public class ProfilingAggregator extends Aggregator {
@ -70,6 +72,16 @@ public class ProfilingAggregator extends Aggregator {
return delegate.subAggregator(name);
}
@Override
public Aggregator resolveSortPath(PathElement next, Iterator<PathElement> path) {
return delegate.resolveSortPath(next, path);
}
@Override
public void validateSortPathKey(String key) {
delegate.validateSortPathKey(key);
}
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
Timer timer = profileBreakdown.getTimer(AggregationTimingType.BUILD_AGGREGATION);

View File

@ -1237,7 +1237,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
AggregationExecutionException e = expectThrows(AggregationExecutionException.class,
() -> createAggregator(termsAgg, indexSearcher, fieldType));
assertEquals("Invalid aggregator order path [script]. The provided aggregation [script] " +
assertEquals("Invalid aggregation order path [script]. The provided aggregation [script] " +
"either does not exist, or is a pipeline aggregation and cannot be used to sort the buckets.",
e.getMessage());
}

View File

@ -28,19 +28,20 @@ import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTe
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;