Add size support to `top_metrics` (backport of #52662) (#52914)

This adds support for returning the top "n" metrics instead of just the
very top.

Relates to #51813
This commit is contained in:
Nik Everett 2020-02-27 16:12:52 -05:00 committed by GitHub
parent d568345f1a
commit 1d1956ee93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1728 additions and 588 deletions

View File

@ -48,17 +48,20 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
public static final String NAME = "top_metrics";
private final SortBuilder<?> sort;
private final int size;
private final String metric;
/**
* Build the request.
* @param name the name of the metric
* @param sort the sort key used to select the top metrics
* @param size number of results to return per bucket
* @param metric the name of the field to select
*/
public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, String metric) {
public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, int size, String metric) {
super(name);
this.sort = sort;
this.size = size;
this.metric = metric;
}
@ -74,6 +77,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
builder.startArray("sort");
sort.toXContent(builder, params);
builder.endArray();
builder.field("size", size);
builder.startObject("metric").field("field", metric).endObject();
}
return builder.endObject();

View File

@ -61,14 +61,11 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(stats.getDistribution(), hasEntry(equalTo("t"), closeTo(.09, .005)));
}
public void testBasic() throws IOException {
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
public void testTopMetricsSizeOne() throws IOException {
indexTopMetricsData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), "v"));
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v"));
SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
ParsedTopMetrics top = response.getAggregations().get("test");
assertThat(top.getTopMetrics(), hasSize(1));
@ -76,4 +73,27 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(metric.getSort(), equalTo(singletonList(2)));
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3.0)));
}
public void testTopMetricsSizeTwo() throws IOException {
indexTopMetricsData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 2, "v"));
SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
ParsedTopMetrics top = response.getAggregations().get("test");
assertThat(top.getTopMetrics(), hasSize(2));
ParsedTopMetrics.TopMetrics metric = top.getTopMetrics().get(0);
assertThat(metric.getSort(), equalTo(singletonList(2)));
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3.0)));
metric = top.getTopMetrics().get(1);
assertThat(metric.getSort(), equalTo(singletonList(1)));
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 2.0)));
}
private void indexTopMetricsData() throws IOException {
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
}
}

View File

@ -14,7 +14,7 @@ POST /test/_bulk?refresh
{"index": {}}
{"s": 1, "v": 3.1415}
{"index": {}}
{"s": 2, "v": 1}
{"s": 2, "v": 1.0}
{"index": {}}
{"s": 3, "v": 2.71828}
POST /test/_search?filter_path=aggregations
@ -74,8 +74,73 @@ At this point `metric` supports only `{"field": "field_name"}` and all metrics
are returned as double precision floating point numbers. Expect more to
come here.
==== `size`
`top_metrics` can return the top few document's worth of metrics using the size parameter:
[source,console,id=search-aggregations-metrics-top-metrics-size]
----
POST /test/_bulk?refresh
{"index": {}}
{"s": 1, "v": 3.1415}
{"index": {}}
{"s": 2, "v": 1.0}
{"index": {}}
{"s": 3, "v": 2.71828}
POST /test/_search?filter_path=aggregations
{
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"sort": {"s": "desc"},
"size": 2
}
}
}
}
----
Which returns:
[source,js]
----
{
"aggregations": {
"tm": {
"top": [
{"sort": [3], "metrics": {"v": 2.718280076980591 } },
{"sort": [2], "metrics": {"v": 1.0 } }
]
}
}
}
----
// TESTRESPONSE
The default `size` is 1. The maximum default size is `10` because the aggregation's
working storage is "dense", meaning we allocate `size` slots for every bucket. `10`
is a *very* conservative default maximum and you can raise it if you need to by
changing the `top_metrics_max_size` index setting. But know that large sizes can
take a fair bit of memory, especially if they are inside of an aggregation which
makes many buckes like a large
<<search-aggregations-metrics-top-metrics-example-terms, terms aggregation>>.
[source,console]
----
PUT /test/_settings
{
"top_metrics_max_size": 100
}
----
// TEST[continued]
NOTE: If `size` is more than `1` the `top_metrics` aggregation can't be the target of a sort.
==== Examples
[[search-aggregations-metrics-top-metrics-example-terms]]
===== Use with terms
This aggregation should be quite useful inside of <<search-aggregations-bucket-terms-aggregation, `terms`>>

View File

@ -527,8 +527,9 @@ public class ScaledFloatFieldMapper extends FieldMapper {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
return new DoubleValuesComparatorSource(this, missingValue, sortMode, nested).newBucketedSort(bigArrays, sortOrder, format);
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
return new DoubleValuesComparatorSource(this, missingValue, sortMode, nested)
.newBucketedSort(bigArrays, sortOrder, format, bucketSize, extra);
}
@Override

View File

@ -80,7 +80,7 @@ public interface IndexFieldData<FD extends AtomicFieldData> extends IndexCompone
* Build a sort implementation specialized for aggregations.
*/
BucketedSort newBucketedSort(BigArrays bigArrays, @Nullable Object missingValue, MultiValueMode sortMode,
Nested nested, SortOrder sortOrder, DocValueFormat format);
Nested nested, SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra);
/**
* Clears any resources associated with this field data.
@ -241,7 +241,8 @@ public interface IndexFieldData<FD extends AtomicFieldData> extends IndexCompone
/**
* Create a {@linkplain BucketedSort} which is useful for sorting inside of aggregations.
*/
public abstract BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format);
public abstract BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra);
}
interface Builder {

View File

@ -140,7 +140,8 @@ public class BytesRefFieldComparatorSource extends IndexFieldData.XFieldComparat
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -96,23 +96,29 @@ public class DoubleValuesComparatorSource extends IndexFieldData.XFieldComparato
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
return new BucketedSort.ForDoubles(bigArrays, sortOrder, format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return new BucketedSort.ForDoubles(bigArrays, sortOrder, format, bucketSize, extra) {
private final double dMissingValue = (Double) missingObject(missingValue, sortOrder == SortOrder.DESC);
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
private final NumericDoubleValues values = getNumericDocValues(ctx, dMissingValue);
return new Leaf(ctx) {
private final NumericDoubleValues docValues = getNumericDocValues(ctx, dMissingValue);
private double docValue;
@Override
protected boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
if (docValues.advanceExact(doc)) {
docValue = docValues.doubleValue();
return true;
}
return false;
}
@Override
protected double docValue() throws IOException {
return values.doubleValue();
protected double docValue() {
return docValue;
}
};
}

View File

@ -85,8 +85,9 @@ public class FloatValuesComparatorSource extends IndexFieldData.XFieldComparator
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
return new BucketedSort.ForFloats(bigArrays, sortOrder, format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return new BucketedSort.ForFloats(bigArrays, sortOrder, format, bucketSize, extra) {
private final float dMissingValue = (Float) missingObject(missingValue, sortOrder == SortOrder.DESC);
@Override
@ -94,20 +95,25 @@ public class FloatValuesComparatorSource extends IndexFieldData.XFieldComparator
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
private final NumericDoubleValues values = getNumericDocValues(ctx, dMissingValue);
return new Leaf(ctx) {
private final NumericDoubleValues docValues = getNumericDocValues(ctx, dMissingValue);
private float docValue;
@Override
public void setScorer(Scorable scorer) {}
@Override
protected boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
if (docValues.advanceExact(doc)) {
docValue = (float) docValues.doubleValue();
return true;
}
return false;
}
@Override
protected float docValue() throws IOException {
return (float) values.doubleValue();
protected float docValue() {
return docValue;
}
};
}

View File

@ -104,23 +104,29 @@ public class LongValuesComparatorSource extends IndexFieldData.XFieldComparatorS
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
return new BucketedSort.ForLongs(bigArrays, sortOrder, format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return new BucketedSort.ForLongs(bigArrays, sortOrder, format, bucketSize, extra) {
private final long lMissingValue = (Long) missingObject(missingValue, sortOrder == SortOrder.DESC);
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
private final NumericDocValues values = getNumericDocValues(ctx, lMissingValue);
return new Leaf(ctx) {
private final NumericDocValues docValues = getNumericDocValues(ctx, lMissingValue);
private long docValue;
@Override
protected boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
if (docValues.advanceExact(doc)) {
docValue = docValues.longValue();
return true;
}
return false;
}
@Override
protected long docValue() throws IOException {
return values.longValue();
protected long docValue() {
return docValue;
}
};
}

View File

@ -108,7 +108,7 @@ public final class GlobalOrdinalsIndexFieldData extends AbstractIndexComponent i
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}
@ -198,7 +198,7 @@ public final class GlobalOrdinalsIndexFieldData extends AbstractIndexComponent i
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -55,7 +55,7 @@ public abstract class AbstractLatLonPointDVIndexFieldData extends DocValuesIndex
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("can't sort on geo_point field without using specific sorting feature, like geo_distance");
}

View File

@ -72,7 +72,7 @@ public class BinaryDVIndexFieldData extends DocValuesIndexFieldData implements I
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}
}

View File

@ -52,7 +52,7 @@ public class BytesBinaryDVIndexFieldData extends DocValuesIndexFieldData impleme
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("can't sort on binary field");
}

View File

@ -165,7 +165,7 @@ public class ConstantIndexFieldData extends AbstractIndexOrdinalsFieldData {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -91,7 +91,7 @@ public class PagedBytesIndexFieldData extends AbstractIndexOrdinalsFieldData {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -121,14 +121,16 @@ public class SortedNumericDVIndexFieldData extends DocValuesIndexFieldData imple
* casting the values if their native type doesn't match.
*/
public BucketedSort newBucketedSort(NumericType targetNumericType, BigArrays bigArrays, @Nullable Object missingValue,
MultiValueMode sortMode, Nested nested, SortOrder sortOrder, DocValueFormat format) {
return comparatorSource(targetNumericType, missingValue, sortMode, nested).newBucketedSort(bigArrays, sortOrder, format);
MultiValueMode sortMode, Nested nested, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return comparatorSource(targetNumericType, missingValue, sortMode, nested)
.newBucketedSort(bigArrays, sortOrder, format, bucketSize, extra);
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, @Nullable Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
return newBucketedSort(numericType, bigArrays, missingValue, sortMode, nested, sortOrder, format);
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
return newBucketedSort(numericType, bigArrays, missingValue, sortMode, nested, sortOrder, format, bucketSize, extra);
}
private XFieldComparatorSource comparatorSource(NumericType targetNumericType, @Nullable Object missingValue, MultiValueMode sortMode,

View File

@ -87,7 +87,7 @@ public class SortedSetDVOrdinalsIndexFieldData extends DocValuesIndexFieldData i
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -209,7 +209,7 @@ public class IdFieldMapper extends MetadataFieldMapper {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode,
Nested nested, SortOrder sortOrder, DocValueFormat format) {
Nested nested, SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new UnsupportedOperationException("can't sort on the [" + CONTENT_TYPE + "] field");
}

View File

@ -33,20 +33,122 @@ import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.DocValueFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import static java.util.Collections.emptyList;
/**
* Type specialized sort implementations designed for use in aggregations.
* Aggregations have a couple of super interesting characteristics:
* <ul>
* <li>They can have many, many buckets so this implementation backs to
* {@link BigArrays} so it doesn't need to allocate any objects per bucket
* and the circuit breaker in {@linkplain BigArrays} will automatically
* track memory usage and abort execution if it grows too large.</li>
* <li>Its fairly common for a bucket to be collected but not returned so
* these implementations delay as much work as possible until collection</li>
* </ul>
* <p>
* Every bucket is in one of two states: "gathering" or min/max "heap". While
* "gathering" the next empty slot is stored in the "root" offset of the
* bucket and collecting a value is just adding it in the next slot bumping
* the tracking value at the root. So collecting values is {@code O(1)}.
* Extracting the results in sorted order is {@code O(n * log n)} because,
* well, sorting is {@code O(n * log n)}. When a bucket has collected
* {@link #bucketSize} entries it is converted into a min "heap" in
* {@code O(n)} time. Or into max heap, if {@link #order} is ascending.
* </p>
* <p>
* Once a "heap", collecting a document is the heap-standard {@code O(log n)}
* worst case. Critically, it is a very fast {@code O(1)} to check if a value
* is competitive at all which, so long as buckets aren't hit in reverse
* order, they mostly won't be. Extracting results in sorted order is still
* {@code O(n * log n)}.
* </p>
* <p>
* When we first collect a bucket we make sure that we've allocated enough
* slots to hold all sort values for the entire bucket. In other words: the
* storage is "dense" and we don't try to save space when storing partially
* filled buckets.
* </p>
* <p>
* We actually *oversize* the allocations
* (like {@link BigArrays#overSize(long)}) to get amortized linear number
* of allocations and to play well with our paged arrays.
* </p>
*/
public abstract class BucketedSort implements Releasable {
// TODO priority queue semantics to support multiple hits in the buckets
/**
* Callbacks for storing extra data along with competitive sorts.
*/
public interface ExtraData {
/**
* Swap the position of two bits of extra data.
* <p>
* Both parameters will have previously been loaded by
* {@link Loader#loadFromDoc(long, int)} so the implementer shouldn't
* need to grow the underlying storage to implement this.
* </p>
*/
void swap(long lhs, long rhs);
/**
* Prepare to load extra data from a leaf.
*/
Loader loader(LeafReaderContext ctx) throws IOException;
@FunctionalInterface
interface Loader {
/**
* Load extra data from a doc.
* <p>
* Implementers <strong>should</strong> grow their underlying
* storage to fit the {@code index}.
* </p>
*/
void loadFromDoc(long index, int doc) throws IOException;
}
}
/**
* An implementation of {@linkplain ExtraData} that does nothing.
*/
public static final ExtraData NOOP_EXTRA_DATA = new ExtraData() {
@Override
public void swap(long lhs, long rhs) {}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
return (index, doc) -> {};
}
};
protected final BigArrays bigArrays;
private final SortOrder order;
private final DocValueFormat format;
private final int bucketSize;
private final ExtraData extra;
/**
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
*/
private final BitArray heapMode;
/**
* The highest bucket ordinal that has been converted into a heap. This is
* required because calling {@link BitArray#get(int)} on an index higher
* than the highest one that was {@link BitArray#set(int) set} could throw
* and {@link ArrayIndexOutOfBoundsException}. So we check this first.
*/
private long maxHeapBucket = 0;
public BucketedSort(BigArrays bigArrays, SortOrder order, DocValueFormat format) {
protected BucketedSort(BigArrays bigArrays, SortOrder order, DocValueFormat format, int bucketSize, ExtraData extra) {
this.bigArrays = bigArrays;
this.order = order;
this.format = format;
this.bucketSize = bucketSize;
this.extra = extra;
heapMode = new BitArray(1, bigArrays);
}
/**
@ -64,17 +166,62 @@ public abstract class BucketedSort implements Releasable {
}
/**
* Get the value for a bucket if it has been collected, null otherwise.
* The number of values to store per bucket.
*/
public final SortValue getValue(long bucket) {
if (bucket >= buckets().size()) {
return null;
public int getBucketSize() {
return bucketSize;
}
/**
* Used with {@link BucketedSort#getValues(long, ResultBuilder)} to
* build results from the sorting operation.
*/
@FunctionalInterface
public interface ResultBuilder<T> {
T build(long index, SortValue sortValue);
}
/**
* Get the values for a bucket if it has been collected. If it hasn't
* then returns an empty list.
* @param builder builds results. See {@link ExtraData} for how to store
* data along side the sort for this to extract.
*/
public final <T extends Comparable<T>> List<T> getValues(long bucket, ResultBuilder<T> builder) {
long rootIndex = bucket * bucketSize;
if (rootIndex >= values().size()) {
// We've never seen this bucket.
return emptyList();
}
return getValueForBucket(bucket);
long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
long end = rootIndex + bucketSize;
List<T> result = new ArrayList<>(bucketSize);
for (long index = start; index < end; index++) {
result.add(builder.build(index, getValue(index)));
}
// TODO we usually have a heap here so we could use that to build the results sorted.
result.sort(order.wrap(Comparator.<T>naturalOrder()));
return result;
}
/**
* Get the values for a bucket if it has been collected. If it hasn't
* then returns an empty array.
*/
public final List<SortValue> getValues(long bucket) {
return getValues(bucket, (i, sv) -> sv);
}
/**
* Is this bucket a min heap {@code true} or in gathering mode {@code false}?
*/
private boolean inHeapMode(long bucket) {
return bucket <= maxHeapBucket && heapMode.get((int) bucket);
}
/**
* Get the {@linkplain Leaf} implementation that'll do that actual collecting.
* @throws IOException most implementations need to perform IO to prepare for each leaf
*/
public abstract Leaf forLeaf(LeafReaderContext ctx) throws IOException;
@ -86,126 +233,318 @@ public abstract class BucketedSort implements Releasable {
/**
* The {@linkplain BigArray} backing this sort.
*/
protected abstract BigArray buckets();
protected abstract BigArray values();
/**
* Grow the {@linkplain BigArray} backing this sort to account for new buckets.
* This will only be called if the array is too small.
*/
protected abstract void grow(long minSize);
protected abstract void growValues(long minSize);
/**
* Get the value for a bucket. This will only be called if the bucket was collected.
* Get the next index that should be "gathered" for a bucket rooted
* at {@code rootIndex}.
*/
protected abstract SortValue getValueForBucket(long bucket);
protected abstract int getNextGatherOffset(long rootIndex);
/**
* Set the next index that should be "gathered" for a bucket rooted
* at {@code rootIndex}.
*/
protected abstract void setNextGatherOffset(long rootIndex, int offset);
/**
* Get the value at an index.
*/
protected abstract SortValue getValue(long index);
/**
* {@code true} if the entry at index {@code lhs} is "better" than
* the entry at {@code rhs}. "Better" in this means "lower" for
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean betterThan(long lhs, long rhs);
/**
* Swap the data at two indices.
*/
protected abstract void swap(long lhs, long rhs);
/**
* Return a fairly human readable representation of the array backing the sort.
* <p>
* This is intentionally not a {@link #toString()} implementation because it'll
* be quite slow.
* </p>
*/
protected final String debugFormat() {
StringBuilder b = new StringBuilder();
for (long index = 0; index < values().size(); index++) {
if (index % bucketSize == 0) {
b.append('\n').append(String.format(Locale.ROOT, "%20d", index / bucketSize)).append(": ");
}
b.append(String.format(Locale.ROOT, "%20s", getValue(index))).append(' ');
}
return b.toString();
}
/**
* Initialize the gather offsets after setting up values. Subclasses
* should call this once, after setting up their {@link #values()}.
*/
protected final void initGatherOffsets() {
setNextGatherOffsets(0);
}
/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
*/
private void grow(long minSize) {
long oldMax = values().size() - 1;
growValues(minSize);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()) + getBucketSize());
}
/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
int nextOffset = getBucketSize() - 1;
for (long bucketRoot = startingAt; bucketRoot < values().size(); bucketRoot += getBucketSize()) {
setNextGatherOffset(bucketRoot, nextOffset);
}
}
/**
* Heapify a bucket who's entries are in random order.
* <p>
* This works by validating the heap property on each node, iterating
* "upwards", pushing any out of order parents "down". Check out the
* <a href="https://en.wikipedia.org/w/index.php?title=Binary_heap&oldid=940542991#Building_a_heap">wikipedia</a>
* entry on binary heaps for more about this.
* </p>
* <p>
* While this *looks* like it could easily be {@code O(n * log n)}, it is
* a fairly well studied algorithm attributed to Floyd. There's
* been a bunch of work that puts this at {@code O(n)}, close to 1.88n worst
* case.
* </p>
* <ul>
* <li>Hayward, Ryan; McDiarmid, Colin (1991).
* <a href="https://web.archive.org/web/20160205023201/http://www.stats.ox.ac.uk/__data/assets/pdf_file/0015/4173/heapbuildjalg.pdf">
* Average Case Analysis of Heap Building byRepeated Insertion</a> J. Algorithms.
* <li>D.E. Knuth, The Art of Computer Programming, Vol. 3, Sorting and Searching</li>
* </ul>
* @param rootIndex the index the start of the bucket
*/
private void heapify(long rootIndex) {
int maxParent = bucketSize / 2 - 1;
for (int parent = maxParent; parent >= 0; parent--) {
downHeap(rootIndex, parent);
}
}
/**
* Correct the heap invariant of a parent and its children. This
* runs in {@code O(log n)} time.
* @param rootIndex index of the start of the bucket
* @param parent Index within the bucket of the parent to check.
* For example, 0 is the "root".
*/
private void downHeap(long rootIndex, int parent) {
while (true) {
long parentIndex = rootIndex + parent;
int worst = parent;
long worstIndex = parentIndex;
int leftChild = parent * 2 + 1;
long leftIndex = rootIndex + leftChild;
if (leftChild < bucketSize) {
if (betterThan(worstIndex, leftIndex)) {
worst = leftChild;
worstIndex = leftIndex;
}
int rightChild = leftChild + 1;
long rightIndex = rootIndex + rightChild;
if (rightChild < bucketSize && betterThan(worstIndex, rightIndex)) {
worst = rightChild;
worstIndex = rightIndex;
}
}
if (worst == parent) {
break;
}
swap(worstIndex, parentIndex);
extra.swap(worstIndex, parentIndex);
parent = worst;
}
}
@Override
public final void close() {
Releasables.close(values(), heapMode);
}
/**
* Performs the actual collection against a {@linkplain LeafReaderContext}.
*/
public abstract class Leaf implements ScorerAware {
/**
* Collect this doc, returning {@code true} if it is competitive.
*/
public final boolean collectIfCompetitive(int doc, long bucket) throws IOException {
if (false == advanceExact(doc)) {
return false;
}
if (bucket >= buckets().size()) {
grow(bucket + 1);
setValue(bucket);
return true;
}
return setIfCompetitive(bucket);
private final LeafReaderContext ctx;
private ExtraData.Loader loader = null;
protected Leaf(LeafReaderContext ctx) {
this.ctx = ctx;
}
/**
* Move the underlying data source reader to the doc and return
* {@code true} if there is data for the sort value.
* Collect this doc, returning {@code true} if it is competitive.
*/
public final void collect(int doc, long bucket) throws IOException {
if (false == advanceExact(doc)) {
return;
}
long rootIndex = bucket * bucketSize;
if (inHeapMode(bucket)) {
if (docBetterThan(rootIndex)) {
// TODO a "bottom up" insert would save a couple of comparisons. Worth it?
setIndexToDocValue(rootIndex);
loader().loadFromDoc(rootIndex, doc);
downHeap(rootIndex, 0);
}
return;
}
// Gathering mode
long requiredSize = rootIndex + bucketSize;
if (values().size() < requiredSize) {
grow(requiredSize);
}
int next = getNextGatherOffset(rootIndex);
assert 0 <= next && next < bucketSize :
"Expected next to be in the range of valid buckets [0 <= " + next + " < " + bucketSize + "]";
long index = next + rootIndex;
setIndexToDocValue(index);
loader().loadFromDoc(index, doc);
if (next == 0) {
if (bucket > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Bucketed sort doesn't support more than [" + Integer.MAX_VALUE + "] buckets");
// BitArray needs int keys and this'd take a ton of memory to use that many buckets. So we just don't.
}
maxHeapBucket = Math.max(bucket, maxHeapBucket);
heapMode.set((int) bucket);
heapify(rootIndex);
} else {
setNextGatherOffset(rootIndex, next - 1);
}
return;
}
/**
* Read the sort value from {@code doc} and return {@code true}
* if there is a value for that document. Otherwise return
* {@code false} and the sort will skip that document.
*/
protected abstract boolean advanceExact(int doc) throws IOException;
/**
* Set the value for a particular bucket to the value that doc has for the sort.
* This is called when we're *sure* we haven't yet seen the bucket.
* Set the value at the index to the value of the document to which
* we just advanced.
*/
protected abstract void setValue(long bucket) throws IOException;
protected abstract void setIndexToDocValue(long index);
/**
* If the value that doc has for the sort is competitive with the other values
* then set it. This is called for buckets we *might* have already seen. So
* implementers will have to check for "empty" buckets in their own way. The
* vaguery here is for two reasons:
* <ul>
* <li>When we see a bucket that won't fit in our arrays we oversize them so
* we don't have to grow them by 1 every time.</li>
* <li>Buckets don't always arrive in order and our storage is "dense" on the
* bucket ordinal. For example, we might get bucket number 4 grow the array
* to fit it, and *then* get bucket number 3.</li>
* </ul>
* {@code true} if the sort value for the doc is "better" than the
* entry at {@code index}. "Better" in means is "lower" for
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean setIfCompetitive(long bucket) throws IOException;
protected abstract boolean docBetterThan(long index);
/**
* Get the extra data loader, building it if we haven't yet built one for this leaf.
*/
private ExtraData.Loader loader() throws IOException {
if (loader == null) {
loader = extra.loader(ctx);
}
return loader;
}
}
/**
* Superclass for implementations of {@linkplain BucketedSort} for {@code double} keys.
*/
public abstract static class ForDoubles extends BucketedSort {
private DoubleArray buckets = bigArrays.newDoubleArray(1, false);
private DoubleArray values = bigArrays.newDoubleArray(getBucketSize(), false);
public ForDoubles(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
super(bigArrays, sortOrder, format);
// NaN is a sentinel value for "unused"
buckets.set(0, Double.NaN);
public ForDoubles(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format, int bucketSize, ExtraData extra) {
super(bigArrays, sortOrder, format, bucketSize, extra);
initGatherOffsets();
}
@Override
public boolean needsScores() { return false; }
@Override
protected final BigArray buckets() { return buckets; }
protected final BigArray values() { return values; }
@Override
protected final void grow(long minSize) {
long oldSize = buckets.size();
buckets = bigArrays.grow(buckets, minSize);
buckets.fill(oldSize, buckets.size(), Double.NaN);
protected final void growValues(long minSize) {
values = bigArrays.grow(values, minSize);
}
@Override
public final SortValue getValueForBucket(long bucket) {
double val = buckets.get(bucket);
if (Double.isNaN(val)) {
return null;
}
return SortValue.from(val);
protected final int getNextGatherOffset(long rootIndex) {
// This cast is safe because all ints fit accurately into a double.
return (int) values.get(rootIndex);
}
@Override
public final void close() {
buckets.close();
protected final void setNextGatherOffset(long rootIndex, int offset) {
values.set(rootIndex, offset);
}
@Override
protected final SortValue getValue(long index) {
return SortValue.from(values.get(index));
}
@Override
protected final boolean betterThan(long lhs, long rhs) {
return getOrder().reverseMul() * Double.compare(values.get(lhs), values.get(rhs)) < 0;
}
@Override
protected final void swap(long lhs, long rhs) {
double tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
}
protected abstract class Leaf extends BucketedSort.Leaf {
protected abstract double docValue() throws IOException;
protected Leaf(LeafReaderContext ctx) {
super(ctx);
}
/**
* Return the value for of this sort for the document to which
* we just {@link #advanceExact(int) moved}. This should be fast
* because it is called twice per competitive hit when in heap
* mode, once for {@link #docBetterThan(long)} and once
* for {@link #setIndexToDocValue(long)}.
*/
protected abstract double docValue();
@Override
public final void setScorer(Scorable scorer) {}
@Override
protected final void setValue(long bucket) throws IOException {
buckets.set(bucket, docValue());
protected final void setIndexToDocValue(long index) {
values.set(index, docValue());
}
@Override
protected final boolean setIfCompetitive(long bucket) throws IOException {
double docSort = docValue();
double bestSort = buckets.get(bucket);
// The NaN check is important here because it needs to always lose.
if (false == Double.isNaN(bestSort) && getOrder().reverseMul() * Double.compare(bestSort, docSort) <= 0) {
return false;
}
buckets.set(bucket, docSort);
return true;
protected final boolean docBetterThan(long index) {
return getOrder().reverseMul() * Double.compare(docValue(), values.get(index)) < 0;
}
}
}
@ -214,58 +553,87 @@ public abstract class BucketedSort implements Releasable {
* Superclass for implementations of {@linkplain BucketedSort} for {@code float} keys.
*/
public abstract static class ForFloats extends BucketedSort {
private FloatArray buckets = bigArrays.newFloatArray(1, false);
/**
* The maximum size of buckets this can store. This is because we
* store the next offset to write to in a float and floats only have
* {@code 23} bits of mantissa so they can't accurate store values
* higher than {@code 2 ^ 24}.
*/
public static final int MAX_BUCKET_SIZE = (int) Math.pow(2, 24);
public ForFloats(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
super(bigArrays, sortOrder, format);
// NaN is a sentinel value for "unused"
buckets.set(0, Float.NaN);
}
private FloatArray values = bigArrays.newFloatArray(1, false);
@Override
protected final BigArray buckets() { return buckets; }
@Override
protected final void grow(long minSize) {
long oldSize = buckets.size();
buckets = bigArrays.grow(buckets, minSize);
buckets.fill(oldSize, buckets.size(), Float.NaN);
}
@Override
public final SortValue getValueForBucket(long bucket) {
float val = buckets.get(bucket);
if (Float.isNaN(val)) {
return null;
public ForFloats(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format, int bucketSize, ExtraData extra) {
super(bigArrays, sortOrder, format, bucketSize, extra);
if (bucketSize > MAX_BUCKET_SIZE) {
close();
throw new IllegalArgumentException("bucket size must be less than [2^24] but was [" + bucketSize + "]");
}
return SortValue.from(val);
initGatherOffsets();
}
@Override
public final void close() {
buckets.close();
protected final BigArray values() { return values; }
@Override
protected final void growValues(long minSize) {
values = bigArrays.grow(values, minSize);
}
@Override
protected final int getNextGatherOffset(long rootIndex) {
/*
* This cast will not lose precision because we make sure never
* to write values here that float can't store precisely.
*/
return (int) values.get(rootIndex);
}
@Override
protected final void setNextGatherOffset(long rootIndex, int offset) {
values.set(rootIndex, offset);
}
@Override
protected final SortValue getValue(long index) {
return SortValue.from(values.get(index));
}
@Override
protected final boolean betterThan(long lhs, long rhs) {
return getOrder().reverseMul() * Float.compare(values.get(lhs), values.get(rhs)) < 0;
}
@Override
protected final void swap(long lhs, long rhs) {
float tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
}
protected abstract class Leaf extends BucketedSort.Leaf {
protected abstract float docValue() throws IOException;
protected Leaf(LeafReaderContext ctx) {
super(ctx);
}
/**
* Return the value for of this sort for the document to which
* we just {@link #advanceExact(int) moved}. This should be fast
* because it is called twice per competitive hit when in heap
* mode, once for {@link #docBetterThan(long)} and once
* for {@link #setIndexToDocValue(long)}.
*/
protected abstract float docValue();
@Override
protected final void setValue(long bucket) throws IOException {
buckets.set(bucket, docValue());
protected final void setIndexToDocValue(long index) {
values.set(index, docValue());
}
@Override
protected final boolean setIfCompetitive(long bucket) throws IOException {
float docSort = docValue();
float bestSort = buckets.get(bucket);
// The NaN check is important here because it needs to always lose.
if (false == Float.isNaN(bestSort) && getOrder().reverseMul() * Float.compare(bestSort, docSort) <= 0) {
return false;
}
buckets.set(bucket, docSort);
return true;
protected final boolean docBetterThan(long index) {
return getOrder().reverseMul() * Float.compare(docValue(), values.get(index)) < 0;
}
}
}
@ -273,99 +641,76 @@ public abstract class BucketedSort implements Releasable {
* Superclass for implementations of {@linkplain BucketedSort} for {@code long} keys.
*/
public abstract static class ForLongs extends BucketedSort {
/**
* Tracks which buckets have been seen before so we can *always*
* set the value in that case. We need this because there isn't a
* sentinel value in the {@code long} type that we can use for this
* like NaN in {@code double} or {@code float}.
*/
private BitArray seen = new BitArray(1, bigArrays);
/**
* The actual values.
*/
private LongArray buckets = bigArrays.newLongArray(1, false);
private long maxBucket = -1;
private LongArray values = bigArrays.newLongArray(1, false);
public ForLongs(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
super(bigArrays, sortOrder, format);
public ForLongs(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format, int bucketSize, ExtraData extra) {
super(bigArrays, sortOrder, format, bucketSize, extra);
initGatherOffsets();
}
@Override
public boolean needsScores() { return false; }
public final boolean needsScores() { return false; }
@Override
protected final BigArray buckets() { return buckets; }
protected final BigArray values() { return values; }
@Override
protected final void grow(long minSize) {
buckets = bigArrays.grow(buckets, minSize);
protected final void growValues(long minSize) {
values = bigArrays.grow(values, minSize);
}
@Override
public final SortValue getValueForBucket(long bucket) {
if (bucket > Integer.MAX_VALUE) {
/* We throw exceptions if we try to collect buckets bigger
* than an int so we *can't* have seen any of these. */
return null;
}
if (bucket > maxBucket) {
return null;
}
if (false == seen.get((int) bucket)) {
/* Buckets we haven't seen must be null here so we can
* skip "gaps" in seen buckets. */
return null;
}
return SortValue.from(buckets.get(bucket));
protected final int getNextGatherOffset(long rootIndex) {
return (int) values.get(rootIndex);
}
@Override
public final void close() {
Releasables.close(seen, buckets);
protected final void setNextGatherOffset(long rootIndex, int offset) {
values.set(rootIndex, offset);
}
@Override
protected final SortValue getValue(long index) {
return SortValue.from(values.get(index));
}
@Override
protected final boolean betterThan(long lhs, long rhs) {
return getOrder().reverseMul() * Long.compare(values.get(lhs), values.get(rhs)) < 0;
}
@Override
protected final void swap(long lhs, long rhs) {
long tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
}
protected abstract class Leaf extends BucketedSort.Leaf {
protected abstract long docValue() throws IOException;
protected Leaf(LeafReaderContext ctx) {
super(ctx);
}
/**
* Return the value for of this sort for the document to which
* we just {@link #advanceExact(int) moved}. This should be fast
* because it is called twice per competitive hit when in heap
* mode, once for {@link #docBetterThan(long)} and once
* for {@link #setIndexToDocValue(long)}.
*/
protected abstract long docValue();
@Override
public final void setScorer(Scorable scorer) {}
@Override
protected final void setValue(long bucket) throws IOException {
seen.set(bucketIsInt(bucket));
buckets.set(bucket, docValue());
maxBucket = Math.max(bucket, maxBucket);
protected final void setIndexToDocValue(long index) {
values.set(index, docValue());
}
@Override
protected final boolean setIfCompetitive(long bucket) throws IOException {
long docSort = docValue();
int intBucket = bucketIsInt(bucket);
if (bucket > maxBucket) {
seen.set(intBucket);
buckets.set(bucket, docSort);
maxBucket = bucket;
return true;
}
if (false == seen.get(intBucket)) {
seen.set(intBucket);
buckets.set(bucket, docSort);
return true;
}
long bestSort = buckets.get(bucket);
if (getOrder().reverseMul() * Double.compare(bestSort, docSort) <= 0) {
return false;
}
buckets.set(bucket, docSort);
return true;
}
private int bucketIsInt(long bucket) {
if (bucket > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Long sort keys don't support more than [" + Integer.MAX_VALUE + "] buckets");
// I don't feel too bad about that because it'd take about 16 GB of memory....
}
return (int) bucket;
protected final boolean docBetterThan(long index) {
return getOrder().reverseMul() * Long.compare(docValue(), values.get(index)) < 0;
}
}
}

View File

@ -418,7 +418,7 @@ public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
}
@Override
public BucketedSort buildBucketedSort(QueryShardContext context) throws IOException {
public BucketedSort buildBucketedSort(QueryShardContext context, int bucketSize, BucketedSort.ExtraData extra) throws IOException {
if (DOC_FIELD_NAME.equals(fieldName)) {
throw new IllegalArgumentException("sorting by _doc is not supported");
}
@ -438,11 +438,11 @@ public class FieldSortBuilder extends SortBuilder<FieldSortBuilder> {
SortedNumericDVIndexFieldData numericFieldData = (SortedNumericDVIndexFieldData) fieldData;
NumericType resolvedType = resolveNumericType(numericType);
return numericFieldData.newBucketedSort(resolvedType, context.bigArrays(), missing, localSortMode(), nested, order,
fieldType.docValueFormat(null, null));
fieldType.docValueFormat(null, null), bucketSize, extra);
}
try {
return fieldData.newBucketedSort(context.bigArrays(), missing, localSortMode(), nested, order,
fieldType.docValueFormat(null, null));
fieldType.docValueFormat(null, null), bucketSize, extra);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("error building sort for field [" + fieldName + "] of type ["
+ fieldType.typeName() + "] in index [" + context.index().getName() + "]: " + e.getMessage(), e);

View File

@ -599,7 +599,7 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
}
@Override
public BucketedSort buildBucketedSort(QueryShardContext context) throws IOException {
public BucketedSort buildBucketedSort(QueryShardContext context, int bucketSize, BucketedSort.ExtraData extra) throws IOException {
GeoPoint[] localPoints = localPoints();
MultiValueMode localSortMode = localSortMode();
IndexGeoPointFieldData geoIndexFieldData = fieldData(context);
@ -608,7 +608,7 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
// TODO implement the single point optimization above
return comparatorSource(localPoints, localSortMode, geoIndexFieldData, nested)
.newBucketedSort(context.bigArrays(), order, DocValueFormat.RAW);
.newBucketedSort(context.bigArrays(), order, DocValueFormat.RAW, bucketSize, extra);
}
private GeoPoint[] localPoints() {
@ -707,21 +707,27 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
return new BucketedSort.ForDoubles(bigArrays, sortOrder, format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return new BucketedSort.ForDoubles(bigArrays, sortOrder, format, bucketSize, extra) {
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
return new Leaf(ctx) {
private final NumericDoubleValues values = getNumericDoubleValues(ctx);
private double value;
@Override
protected boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
if (values.advanceExact(doc)) {
value = values.doubleValue();
return true;
}
return false;
}
@Override
protected double docValue() throws IOException {
return values.doubleValue();
protected double docValue() {
return value;
}
};
}

View File

@ -104,15 +104,16 @@ public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
}
@Override
public BucketedSort buildBucketedSort(QueryShardContext context) throws IOException {
return new BucketedSort.ForFloats(context.bigArrays(), order, DocValueFormat.RAW) {
public BucketedSort buildBucketedSort(QueryShardContext context, int bucketSize, BucketedSort.ExtraData extra) throws IOException {
return new BucketedSort.ForFloats(context.bigArrays(), order, DocValueFormat.RAW, bucketSize, extra) {
@Override
public boolean needsScores() { return true; }
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new BucketedSort.ForFloats.Leaf() {
return new BucketedSort.ForFloats.Leaf(ctx) {
private Scorable scorer;
private float score;
@Override
public void setScorer(Scorable scorer) {
@ -124,12 +125,13 @@ public class ScoreSortBuilder extends SortBuilder<ScoreSortBuilder> {
assert doc == scorer.docID() : "expected scorer to be on [" + doc + "] but was on [" + scorer.docID() + "]";
/* We will never be called by documents that don't match the
* query and they'll all have a score, thus `true`. */
score = scorer.score();
return true;
}
@Override
protected float docValue() throws IOException {
return scorer.score();
protected float docValue() {
return score;
}
};
}

View File

@ -313,8 +313,8 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
}
@Override
public BucketedSort buildBucketedSort(QueryShardContext context) throws IOException {
return fieldComparatorSource(context).newBucketedSort(context.bigArrays(), order, DocValueFormat.RAW);
public BucketedSort buildBucketedSort(QueryShardContext context, int bucketSize, BucketedSort.ExtraData extra) throws IOException {
return fieldComparatorSource(context).newBucketedSort(context.bigArrays(), order, DocValueFormat.RAW, bucketSize, extra);
}
private IndexFieldData.XFieldComparatorSource fieldComparatorSource(QueryShardContext context) throws IOException {
@ -369,7 +369,8 @@ public class ScriptSortBuilder extends SortBuilder<ScriptSortBuilder> {
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("error building sort for [_script]: "
+ "script sorting only supported on [numeric] scripts but was [" + type + "]");
}

View File

@ -77,7 +77,8 @@ public abstract class SortBuilder<T extends SortBuilder<T>> implements NamedWrit
/**
* Create a {@linkplain BucketedSort} which is useful for sorting inside of aggregations.
*/
public abstract BucketedSort buildBucketedSort(QueryShardContext context) throws IOException;
public abstract BucketedSort buildBucketedSort(QueryShardContext context,
int bucketSize, BucketedSort.ExtraData extra) throws IOException;
/**
* Set the order of sorting.

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.Locale;
/**
@ -45,6 +46,11 @@ public enum SortOrder implements Writeable {
public int reverseMul() {
return 1;
}
@Override
public <T> Comparator<T> wrap(Comparator<T> delegate) {
return delegate;
}
},
/**
* Descending order.
@ -59,6 +65,11 @@ public enum SortOrder implements Writeable {
public int reverseMul() {
return -1;
}
@Override
public <T> Comparator<T> wrap(Comparator<T> delegate) {
return delegate.reversed();
}
};
public static SortOrder readFromStream(StreamInput in) throws IOException {
@ -78,4 +89,9 @@ public enum SortOrder implements Writeable {
* -1 if the sort is reversed from the standard comparators, 1 otherwise.
*/
public abstract int reverseMul();
/**
* Wrap a comparator in one for this direction.
*/
public abstract <T> Comparator<T> wrap(Comparator<T> delegate);
}

View File

@ -66,7 +66,7 @@ public class NoOrdinalsStringFieldDataTests extends PagedBytesStringFieldDataTes
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new UnsupportedOperationException();
}

View File

@ -48,20 +48,20 @@ import org.elasticsearch.common.lucene.search.function.FieldValueFactorFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery.FilterScoreFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery.ScoreMode;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.lucene.search.function.LeafScoreFunction;
import org.elasticsearch.common.lucene.search.function.RandomScoreFunction;
import org.elasticsearch.common.lucene.search.function.ScoreFunction;
import org.elasticsearch.common.lucene.search.function.WeightFactorFunction;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.AtomicNumericFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.sort.BucketedSort;
@ -151,7 +151,7 @@ public class FunctionScoreTests extends ESTestCase {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new UnsupportedOperationException(UNSUPPORTED);
}
@ -248,7 +248,7 @@ public class FunctionScoreTests extends ESTestCase {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new UnsupportedOperationException(UNSUPPORTED);
}

View File

@ -287,7 +287,8 @@ public class SearchAfterBuilderTests extends ESTestCase {
}
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format) {
public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format,
int bucketSize, BucketedSort.ExtraData extra) {
return null;
}
};

View File

@ -22,25 +22,24 @@ package org.elasticsearch.search.sort;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.DocValueFormat;
import java.io.IOException;
public class BucketedSortForDoublesTests extends BucketedSortTestCase<BucketedSort.ForDoubles> {
@Override
public BucketedSort.ForDoubles build(SortOrder sortOrder, DocValueFormat format, double[] values) {
return new BucketedSort.ForDoubles(bigArrays(), sortOrder, format) {
public BucketedSort.ForDoubles build(SortOrder sortOrder, DocValueFormat format, int bucketSize,
BucketedSort.ExtraData extra, double[] values) {
return new BucketedSort.ForDoubles(bigArrays(), sortOrder, format, bucketSize, extra) {
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
public Leaf forLeaf(LeafReaderContext ctx) {
return new Leaf(ctx) {
int index = -1;
@Override
protected boolean advanceExact(int doc) throws IOException {
protected boolean advanceExact(int doc) {
index = doc;
return doc < values.length;
}
@Override
protected double docValue() throws IOException {
protected double docValue() {
return values[index];
}
};
@ -52,4 +51,9 @@ public class BucketedSortForDoublesTests extends BucketedSortTestCase<BucketedSo
protected SortValue expectedSortValue(double v) {
return SortValue.from(v);
}
@Override
protected double randomValue() {
return randomDouble();
}
}

View File

@ -25,28 +25,32 @@ import org.elasticsearch.search.DocValueFormat;
import java.io.IOException;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
public class BucketedSortForFloatsTests extends BucketedSortTestCase<BucketedSort.ForFloats> {
@Override
public BucketedSort.ForFloats build(SortOrder sortOrder, DocValueFormat format, double[] values) {
return new BucketedSort.ForFloats(bigArrays(), sortOrder, format) {
public BucketedSort.ForFloats build(SortOrder sortOrder, DocValueFormat format, int bucketSize,
BucketedSort.ExtraData extra, double[] values) {
return new BucketedSort.ForFloats(bigArrays(), sortOrder, format, bucketSize, extra) {
@Override
public boolean needsScores() {
return false;
}
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
public Leaf forLeaf(LeafReaderContext ctx) {
return new Leaf(ctx) {
int index = -1;
@Override
protected boolean advanceExact(int doc) throws IOException {
protected boolean advanceExact(int doc) {
index = doc;
return doc < values.length;
}
@Override
protected float docValue() throws IOException {
protected float docValue() {
return (float) values[index];
}
@ -57,12 +61,13 @@ public class BucketedSortForFloatsTests extends BucketedSortTestCase<BucketedSor
};
}
private BucketedSort.ForFloats buildForScores(SortOrder sortOrder, DocValueFormat format) {
return new BucketedSort.ForFloats(bigArrays(), sortOrder, format) {
private BucketedSort.ForFloats buildForScores(SortOrder sortOrder, DocValueFormat format, int bucketSize) {
return new BucketedSort.ForFloats(bigArrays(), sortOrder, format, bucketSize, BucketedSort.NOOP_EXTRA_DATA) {
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
public Leaf forLeaf(LeafReaderContext ctx) {
return new Leaf(ctx) {
Scorable scorer;
float score;
@Override
public void setScorer(Scorable scorer) {
@ -71,12 +76,13 @@ public class BucketedSortForFloatsTests extends BucketedSortTestCase<BucketedSor
@Override
protected boolean advanceExact(int doc) throws IOException {
return scorer.docID() == doc;
score = scorer.score();
return true;
}
@Override
protected float docValue() throws IOException {
return scorer.score();
protected float docValue() {
return score;
}
};
}
@ -88,31 +94,37 @@ public class BucketedSortForFloatsTests extends BucketedSortTestCase<BucketedSor
};
}
@Override
protected SortValue expectedSortValue(double v) {
return SortValue.from(v);
/*
* The explicit cast to float is important because it reduces
* the expected precision to the one we can provide. Sneaky. Computers
* are sneaky.
*/
return SortValue.from((float) v);
}
@Override
protected double randomValue() {
return randomFloat();
}
public void testScorer() throws IOException {
try (BucketedSort.ForFloats sort = buildForScores(SortOrder.DESC, DocValueFormat.RAW)) {
try (BucketedSort.ForFloats sort = buildForScores(SortOrder.DESC, DocValueFormat.RAW, 2)) {
assertTrue(sort.needsScores());
BucketedSort.Leaf leaf = sort.forLeaf(null);
MockScorable scorer = new MockScorable();
leaf.setScorer(scorer);
scorer.doc = 1;
scorer.score = 10;
assertFalse(leaf.collectIfCompetitive(0, 0));
assertTrue(leaf.collectIfCompetitive(1, 0));
assertEquals(sort.getValue(0), SortValue.from(10.0));
scorer.doc = 2;
leaf.collect(0, 0);
scorer.score = 1;
assertFalse(leaf.collectIfCompetitive(2, 0));
assertEquals(sort.getValue(0), SortValue.from(10.0));
leaf.collect(0, 0);
scorer.score = 0;
leaf.collect(3, 0);
assertThat(sort.getValues(0), contains(SortValue.from(10.0), SortValue.from(1.0)));
}
}
private class MockScorable extends Scorable {
private int doc;
private float score;
@ -128,4 +140,51 @@ public class BucketedSortForFloatsTests extends BucketedSortTestCase<BucketedSor
}
}
/**
* Check that we can store the largest bucket theoretically possible.
*/
public void testBiggest() throws IOException {
try (BucketedSort sort = new BucketedSort.ForFloats(bigArrays(), SortOrder.DESC, DocValueFormat.RAW,
BucketedSort.ForFloats.MAX_BUCKET_SIZE, BucketedSort.NOOP_EXTRA_DATA) {
@Override
public boolean needsScores() { return false; }
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf(ctx) {
int doc;
@Override
protected boolean advanceExact(int doc) throws IOException {
this.doc = doc;
return true;
}
@Override
protected float docValue() {
return doc;
}
@Override
public void setScorer(Scorable scorer) {}
};
}
}) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
int extra = between(0, 1000);
int max = BucketedSort.ForFloats.MAX_BUCKET_SIZE + extra;
for (int i = 0; i < max; i++) {
leaf.advanceExact(i);
leaf.collect(i, 0);
leaf.collect(i, 1);
}
assertThat(sort.getValue(0), equalTo(SortValue.from((double) extra)));
assertThat(sort.getValue(BucketedSort.ForFloats.MAX_BUCKET_SIZE), equalTo(SortValue.from((double) extra)));
}
}
public void testTooBig() {
int tooBig = BucketedSort.ForFloats.MAX_BUCKET_SIZE + 1;
Exception e = expectThrows(IllegalArgumentException.class, () ->
build(randomFrom(SortOrder.values()), DocValueFormat.RAW, tooBig, BucketedSort.NOOP_EXTRA_DATA, new double[] {}));
assertThat(e.getMessage(), equalTo("bucket size must be less than [2^24] but was [" + tooBig + "]"));
}
}

View File

@ -22,25 +22,24 @@ package org.elasticsearch.search.sort;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.DocValueFormat;
import java.io.IOException;
public class BucketedSortForLongsTests extends BucketedSortTestCase<BucketedSort.ForLongs> {
@Override
public BucketedSort.ForLongs build(SortOrder sortOrder, DocValueFormat format, double[] values) {
return new BucketedSort.ForLongs(bigArrays(), sortOrder, format) {
public BucketedSort.ForLongs build(SortOrder sortOrder, DocValueFormat format, int bucketSize,
BucketedSort.ExtraData extra, double[] values) {
return new BucketedSort.ForLongs(bigArrays(), sortOrder, format, bucketSize, extra) {
@Override
public Leaf forLeaf(LeafReaderContext ctx) throws IOException {
return new Leaf() {
public Leaf forLeaf(LeafReaderContext ctx) {
return new Leaf(ctx) {
int index = -1;
@Override
protected boolean advanceExact(int doc) throws IOException {
protected boolean advanceExact(int doc) {
index = doc;
return doc < values.length;
}
@Override
protected long docValue() throws IOException {
protected long docValue() {
return (long) values[index];
}
};
@ -52,4 +51,10 @@ public class BucketedSortForLongsTests extends BucketedSortTestCase<BucketedSort
protected SortValue expectedSortValue(double v) {
return SortValue.from((long) v);
}
@Override
protected double randomValue() {
// 2L^50 fits in the mantisa of a double which the test sort of needs.
return randomLongBetween(-2L^50, 2L^50);
}
}

View File

@ -19,8 +19,12 @@
package org.elasticsearch.search.sort;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -28,11 +32,17 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.lessThan;
public abstract class BucketedSortTestCase<T extends BucketedSort> extends ESTestCase {
/**
@ -40,110 +50,128 @@ public abstract class BucketedSortTestCase<T extends BucketedSort> extends ESTes
* @param values values to test, always sent as doubles just to have
* numbers to test. subclasses should cast to their favorite types
*/
protected abstract T build(SortOrder sortOrder, DocValueFormat format, double[] values);
protected abstract T build(SortOrder sortOrder, DocValueFormat format, int bucketSize,
BucketedSort.ExtraData extra, double[] values);
/**
* Build the expected sort value for a value.
*/
protected abstract SortValue expectedSortValue(double v);
private T build(SortOrder order, double[] values) {
/**
* A random value for testing, with the appropriate precision for the type we're testing.
*/
protected abstract double randomValue();
protected final T build(SortOrder order, int bucketSize, BucketedSort.ExtraData extra, double[] values) {
DocValueFormat format = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
return build(order, format, values);
return build(order, format, bucketSize, extra, values);
}
private T build(SortOrder order, int bucketSize, double[] values) {
DocValueFormat format = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
return build(order, format, bucketSize, BucketedSort.NOOP_EXTRA_DATA, values);
}
public final void testNeverCalled() {
SortOrder order = randomFrom(SortOrder.values());
DocValueFormat format = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
try (T sort = build(order, format, new double[] {})) {
try (T sort = build(order, format, 1, BucketedSort.NOOP_EXTRA_DATA, new double[] {})) {
assertThat(sort.getOrder(), equalTo(order));
assertThat(sort.getFormat(), equalTo(format));
assertThat(sort.getValue(randomNonNegativeLong()), nullValue());
assertThat(sort.getValues(randomNonNegativeLong()), empty());
assertFalse(sort.needsScores());
}
}
public final void testEmptyLeaf() throws IOException {
try (T sort = build(randomFrom(SortOrder.values()), new double[] {})) {
try (T sort = build(randomFrom(SortOrder.values()), 1, new double[] {})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertFalse(leaf.advanceExact(0));
assertThat(sort.getValue(randomNonNegativeLong()), nullValue());
leaf.collect(0, 0);
assertThat(sort.getValues(randomNonNegativeLong()), empty());
}
}
public final void testSingleDoc() throws IOException {
try (T sort = build(randomFrom(SortOrder.values()), new double[] {1})) {
try (T sort = build(randomFrom(SortOrder.values()), 1, new double[] {1})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(1)));
leaf.collect(0, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(1)));
}
}
public void testNonCompetitive() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {2, 1})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {2, 1})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertFalse(leaf.collectIfCompetitive(1, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(2)));
leaf.collect(0, 0);
leaf.collect(1, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(2)));
}
}
public void testCompetitive() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {1, 2})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {1, 2})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertTrue(leaf.collectIfCompetitive(1, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(2)));
leaf.collect(0, 0);
leaf.collect(1, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(2)));
}
}
public void testNegativeValue() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {-1})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {-1})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(-1)));
leaf.collect(0, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(-1)));
}
}
public void testSomeBuckets() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {2, 3})) {
try (Extra extra = new Extra(bigArrays(), new int[] {100, 200});
T sort = build(SortOrder.DESC, 1, extra, new double[] {2, 3})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertTrue(leaf.collectIfCompetitive(0, 1));
assertTrue(leaf.collectIfCompetitive(0, 2));
assertTrue(leaf.collectIfCompetitive(1, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(3)));
assertThat(sort.getValue(1), equalTo(expectedSortValue(2)));
assertThat(sort.getValue(2), equalTo(expectedSortValue(2)));
assertThat(sort.getValue(3), nullValue());
leaf.collect(0, 0);
leaf.collect(0, 1);
leaf.collect(0, 2);
leaf.collect(1, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(3)));
assertThat(sort.getValues(1), contains(expectedSortValue(2)));
assertThat(sort.getValues(2), contains(expectedSortValue(2)));
assertThat(sort.getValues(3), empty());
assertThat(sort.getValues(0, extra.valueBuilder()), contains(extraValue(200, 3)));
assertThat(sort.getValues(1, extra.valueBuilder()), contains(extraValue(100, 2)));
assertThat(sort.getValues(2, extra.valueBuilder()), contains(extraValue(100, 2)));
assertThat(sort.getValues(3, extra.valueBuilder()), empty());
}
}
public void testBucketGaps() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {2})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {2})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 0));
assertTrue(leaf.collectIfCompetitive(0, 2));
assertThat(sort.getValue(0), equalTo(expectedSortValue(2)));
assertThat(sort.getValue(1), nullValue());
assertThat(sort.getValue(2), equalTo(expectedSortValue(2)));
assertThat(sort.getValue(3), nullValue());
leaf.collect(0, 0);
leaf.collect(0, 2);
assertThat(sort.getValues(0), contains(expectedSortValue(2)));
assertThat(sort.getValues(1), empty());
assertThat(sort.getValues(2), contains(expectedSortValue(2)));
assertThat(sort.getValues(3), empty());
}
}
public void testBucketsOutOfOrder() throws IOException {
try (T sort = build(SortOrder.DESC, new double[] {2})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {2})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
assertTrue(leaf.collectIfCompetitive(0, 1));
assertTrue(leaf.collectIfCompetitive(0, 0));
assertThat(sort.getValue(0), equalTo(expectedSortValue(2.0)));
assertThat(sort.getValue(1), equalTo(expectedSortValue(2.0)));
assertThat(sort.getValue(2), nullValue());
leaf.collect(0, 1);
leaf.collect(0, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(2.0)));
assertThat(sort.getValues(1), contains(expectedSortValue(2.0)));
assertThat(sort.getValues(2), empty());
}
}
public void testManyBuckets() throws IOException {
// Set the bucket values in random order
// Collect the buckets in random order
int[] buckets = new int[10000];
for (int b = 0; b < buckets.length; b++) {
buckets[b] = b;
@ -152,27 +180,217 @@ public abstract class BucketedSortTestCase<T extends BucketedSort> extends ESTes
double[] maxes = new double[buckets.length];
try (T sort = build(SortOrder.DESC, new double[] {2, 3, -1})) {
try (T sort = build(SortOrder.DESC, 1, new double[] {2, 3, -1})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
for (int b : buckets) {
maxes[b] = 2;
assertTrue(leaf.collectIfCompetitive(0, b));
leaf.collect(0, b);
if (randomBoolean()) {
maxes[b] = 3;
assertTrue(leaf.collectIfCompetitive(1, b));
leaf.collect(1, b);
}
if (randomBoolean()) {
assertFalse(leaf.collectIfCompetitive(2, b));
leaf.collect(2, b);
}
}
for (int b = 0; b < buckets.length; b++) {
assertThat(sort.getValue(b), equalTo(expectedSortValue(maxes[b])));
assertThat(sort.getValues(b), contains(expectedSortValue(maxes[b])));
}
assertThat(sort.getValue(buckets.length), nullValue());
assertThat(sort.getValues(buckets.length), empty());
}
}
public void testTwoHitsDesc() throws IOException {
try (Extra extra = new Extra(bigArrays(), new int[] {100, 200, 3000});
T sort = build(SortOrder.DESC, 2, extra, new double[] {1, 2, 3})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
leaf.collect(0, 0);
leaf.collect(1, 0);
leaf.collect(2, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(3), expectedSortValue(2)));
assertThat(sort.getValues(0, extra.valueBuilder()), contains(extraValue(3000, 3), extraValue(200, 2)));
}
}
public void testTwoHitsAsc() throws IOException {
try (T sort = build(SortOrder.ASC, 2, new double[] {1, 2, 3})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
leaf.collect(0, 0);
leaf.collect(1, 0);
leaf.collect(2, 0);
assertThat(sort.getValues(0), contains(expectedSortValue(1), expectedSortValue(2)));
}
}
public void testManyHits() throws IOException {
// Set the values in random order
double[] values = new double[10000];
for (int v = 0; v < values.length; v++) {
values[v] = randomValue();
}
Collections.shuffle(Arrays.asList(values), random());
int bucketSize = between(2, 1000);
SwapCountingExtra counter = new SwapCountingExtra();
try (T sort = build(SortOrder.DESC, bucketSize, counter, values)) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
for (int doc = 0; doc < values.length; doc++) {
leaf.collect(doc, 0);
}
assertThat(sort.getValues(0), contains(Arrays.stream(values).boxed()
.sorted((lhs, rhs) -> rhs.compareTo(lhs))
.limit(bucketSize).map(s -> equalTo(expectedSortValue(s)))
.collect(toList())));
assertThat(sort.getValues(1), empty());
}
// We almost always *way* undershoot this value.
assertThat(counter.count, lessThan((long)(bucketSize + values.length * Math.log(bucketSize) / Math.log(2))));
}
public void testTwoHitsTwoBucket() throws IOException {
try (T sort = build(SortOrder.DESC, 2, new double[] {1, 2, 3, 4})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
leaf.collect(0, 0);
leaf.collect(0, 1);
leaf.collect(1, 0);
leaf.collect(1, 1);
leaf.collect(2, 0);
leaf.collect(2, 1);
leaf.collect(3, 1);
assertThat(sort.getValues(0), contains(expectedSortValue(3), expectedSortValue(2)));
assertThat(sort.getValues(1), contains(expectedSortValue(4), expectedSortValue(3)));
}
}
public void testManyBucketsManyHits() throws IOException {
// Set the values in random order
double[] values = new double[10000];
for (int v = 0; v < values.length; v++) {
values[v] = randomValue();
}
Collections.shuffle(Arrays.asList(values), random());
int buckets = between(2, 100);
int bucketSize = between(2, 100);
try (T sort = build(SortOrder.DESC, bucketSize, values)) {
BitArray[] bucketUsed = new BitArray[buckets];
Arrays.setAll(bucketUsed, i -> new BitArray(values.length, bigArrays()));
BucketedSort.Leaf leaf = sort.forLeaf(null);
for (int doc = 0; doc < values.length; doc++) {
for (int bucket = 0; bucket < buckets; bucket++) {
if (randomBoolean()) {
bucketUsed[bucket].set(doc);
leaf.collect(doc, bucket);
}
}
}
for (int bucket = 0; bucket < buckets; bucket++) {
List<Double> bucketValues = new ArrayList<>(values.length);
for (int doc = 0; doc < values.length; doc++) {
if (bucketUsed[bucket].get(doc)) {
bucketValues.add(values[doc]);
}
}
bucketUsed[bucket].close();
assertThat("Bucket " + bucket, sort.getValues(bucket), contains(bucketValues.stream()
.sorted((lhs, rhs) -> rhs.compareTo(lhs))
.limit(bucketSize).map(s -> equalTo(expectedSortValue(s)))
.collect(toList())));
}
assertThat(sort.getValues(buckets), empty());
}
}
protected BigArrays bigArrays() {
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
private Extra.Value extraValue(int extra, double sort) {
return new Extra.Value(extra, expectedSortValue(sort));
}
private static class Extra implements BucketedSort.ExtraData, Releasable {
private static class Value implements Comparable<Value> {
private final int extra;
private final SortValue sortValue;
Value(int extra, SortValue sortValue) {
this.extra = extra;
this.sortValue = sortValue;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Value other = (Value) obj;
return extra == other.extra && sortValue.equals(other.sortValue);
}
@Override
public int hashCode() {
return Objects.hash(extra, sortValue);
}
@Override
public int compareTo(Value o) {
return sortValue.compareTo(o.sortValue);
}
@Override
public String toString() {
return "[" + extra + "," + sortValue + "]";
}
}
private final BigArrays bigArrays;
private final int[] docValues;
private IntArray values;
Extra(BigArrays bigArrays, int[] docValues) {
this.bigArrays = bigArrays;
this.docValues = docValues;
values = bigArrays.newIntArray(1, false);
}
public BucketedSort.ResultBuilder<Value> valueBuilder() {
return (i, sv) -> new Value(values.get(i), sv);
}
@Override
public void swap(long lhs, long rhs) {
int tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
return (index, doc) -> {
values = bigArrays.grow(values, index + 1);
values.set(index, docValues[doc]);
};
}
@Override
public void close() {
values.close();
}
}
private class SwapCountingExtra implements BucketedSort.ExtraData {
private long count = 0;
@Override
public void swap(long lhs, long rhs) {
count++;
}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
return (index, doc) -> {};
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.analytics;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.index.mapper.Mapper;
@ -26,6 +27,7 @@ import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregatorFactory;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
@ -101,6 +103,11 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
return modules;
}
@Override
public List<Setting<?>> getSettings() {
return singletonList(TopMetricsAggregatorFactory.MAX_BUCKET_SIZE);
}
@Override
public Map<String, Mapper.TypeParser> getMappers() {
return Collections.singletonMap(HistogramFieldMapper.CONTENT_TYPE, new HistogramFieldMapper.TypeParser());

View File

@ -268,7 +268,7 @@ public class HistogramFieldMapper extends FieldMapper {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode,
Nested nested, SortOrder sortOrder, DocValueFormat format) {
Nested nested, SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("can't sort on the [" + CONTENT_TYPE + "] field");
}
};

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.analytics.topmetrics;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -17,31 +19,34 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiValue {
private final DocValueFormat sortFormat;
private final SortOrder sortOrder;
private final SortValue sortValue;
private final String metricName;
private final double metricValue;
import static java.util.Collections.emptyList;
public InternalTopMetrics(String name, DocValueFormat sortFormat, @Nullable SortOrder sortOrder, SortValue sortValue, String metricName,
double metricValue, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiValue {
private final SortOrder sortOrder;
private final int size;
private final String metricName;
private final List<TopMetric> topMetrics;
public InternalTopMetrics(String name, @Nullable SortOrder sortOrder, String metricName,
int size, List<TopMetric> topMetrics, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sortFormat = sortFormat;
this.sortOrder = sortOrder;
this.sortValue = sortValue;
this.metricName = metricName;
this.metricValue = metricValue;
/*
* topMetrics.size won't be size when the bucket doesn't have size docs!
*/
this.size = size;
this.topMetrics = topMetrics;
}
static InternalTopMetrics buildEmptyAggregation(String name, String metricField,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
return new InternalTopMetrics(name, DocValueFormat.RAW, SortOrder.ASC, null, metricField, Double.NaN, pipelineAggregators,
metaData);
return new InternalTopMetrics(name, SortOrder.ASC, metricField, 0, emptyList(), pipelineAggregators, metaData);
}
/**
@ -49,20 +54,18 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
*/
public InternalTopMetrics(StreamInput in) throws IOException {
super(in);
sortFormat = in.readNamedWriteable(DocValueFormat.class);
sortOrder = SortOrder.readFromStream(in);
sortValue = in.readOptionalNamedWriteable(SortValue.class);
metricName = in.readString();
metricValue = in.readDouble();
size = in.readVInt();
topMetrics = in.readList(TopMetric::new);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(sortFormat);
sortOrder.writeTo(out);
out.writeOptionalNamedWriteable(sortValue);
out.writeString(metricName);
out.writeDouble(metricValue);
out.writeVInt(size);
out.writeList(topMetrics);
}
@Override
@ -76,7 +79,12 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
return this;
}
if (path.size() == 1 && metricName.contentEquals(path.get(1))) {
return metricValue;
if (topMetrics.isEmpty()) {
// Unmapped.
return null;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValue;
}
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
@ -86,31 +94,143 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
if (false == isMapped()) {
return this;
}
DocValueFormat bestSortFormat = sortFormat;
SortValue bestSortValue = sortValue;
double bestMetricValue = metricValue;
int reverseMul = sortOrder.reverseMul();
List<TopMetric> merged = new ArrayList<>(size);
PriorityQueue<ReduceState> queue = new PriorityQueue<ReduceState>(aggregations.size()) {
@Override
protected boolean lessThan(ReduceState lhs, ReduceState rhs) {
return sortOrder.reverseMul() * lhs.sortValue().compareTo(rhs.sortValue()) < 0;
}
};
for (InternalAggregation agg : aggregations) {
InternalTopMetrics result = (InternalTopMetrics) agg;
if (result.sortValue != null && reverseMul * bestSortValue.compareTo(result.sortValue) > 0) {
bestSortFormat = result.sortFormat;
bestSortValue = result.sortValue;
bestMetricValue = result.metricValue;
if (result.isMapped()) {
queue.add(new ReduceState(result));
}
}
return new InternalTopMetrics(getName(), bestSortFormat, sortOrder, bestSortValue, metricName, bestMetricValue,
pipelineAggregators(), getMetaData());
while (queue.size() > 0 && merged.size() < size) {
merged.add(queue.top().topMetric());
queue.top().index++;
if (queue.top().result.topMetrics.size() <= queue.top().index) {
queue.pop();
} else {
queue.updateTop();
}
}
return new InternalTopMetrics(getName(), sortOrder, metricName, size, merged, pipelineAggregators(), getMetaData());
}
@Override
public boolean isMapped() {
return sortValue != null;
return false == topMetrics.isEmpty();
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray("top");
if (sortValue != null) {
for (TopMetric top : topMetrics) {
top.toXContent(builder, metricName);
}
builder.endArray();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), sortOrder, metricName, size, topMetrics);
}
@Override
public boolean equals(Object obj) {
if (super.equals(obj) == false) return false;
InternalTopMetrics other = (InternalTopMetrics) obj;
return sortOrder.equals(other.sortOrder) &&
metricName.equals(other.metricName) &&
size == other.size &&
topMetrics.equals(other.topMetrics);
}
@Override
public double value(String name) {
if (metricName.equals(name)) {
if (topMetrics.isEmpty()) {
return Double.NaN;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValue;
}
throw new IllegalArgumentException("known metric [" + name + "]");
}
SortOrder getSortOrder() {
return sortOrder;
}
int getSize() {
return size;
}
String getMetricName() {
return metricName;
}
List<TopMetric> getTopMetrics() {
return topMetrics;
}
private class ReduceState {
private final InternalTopMetrics result;
private int index = 0;
ReduceState(InternalTopMetrics result) {
this.result = result;
}
SortValue sortValue() {
return topMetric().sortValue;
}
TopMetric topMetric() {
return result.topMetrics.get(index);
}
}
static class TopMetric implements Writeable, Comparable<TopMetric> {
private final DocValueFormat sortFormat;
private final SortValue sortValue;
private final double metricValue;
TopMetric(DocValueFormat sortFormat, SortValue sortValue, double metricValue) {
this.sortFormat = sortFormat;
this.sortValue = sortValue;
this.metricValue = metricValue;
}
TopMetric(StreamInput in) throws IOException {
sortFormat = in.readNamedWriteable(DocValueFormat.class);
sortValue = in.readNamedWriteable(SortValue.class);
metricValue = in.readDouble();
}
DocValueFormat getSortFormat() {
return sortFormat;
}
SortValue getSortValue() {
return sortValue;
}
double getMetricValue() {
return metricValue;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(sortFormat);
out.writeNamedWriteable(sortValue);
out.writeDouble(metricValue);
}
public XContentBuilder toXContent(XContentBuilder builder, String metricName) throws IOException {
builder.startObject();
{
builder.startArray("sort");
@ -122,57 +242,33 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
}
builder.endObject();
}
builder.endObject();
return builder.endObject();
}
builder.endArray();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), sortFormat, sortOrder, sortValue, metricName, metricValue);
}
@Override
public boolean equals(Object obj) {
if (super.equals(obj) == false) return false;
InternalTopMetrics other = (InternalTopMetrics) obj;
return sortFormat.equals(other.sortFormat) &&
sortOrder.equals(other.sortOrder) &&
Objects.equals(sortValue, other.sortValue) &&
metricName.equals(other.metricName) &&
metricValue == other.metricValue;
}
@Override
public double value(String name) {
if (metricName.equals(name)) {
return metricValue;
@Override
public int compareTo(TopMetric o) {
return sortValue.compareTo(o.sortValue);
}
throw new IllegalArgumentException("known metric [" + name + "]");
}
DocValueFormat getSortFormat() {
return sortFormat;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
TopMetric other = (TopMetric) obj;
return sortFormat.equals(other.sortFormat)
&& sortValue.equals(other.sortValue)
&& metricValue == other.metricValue;
}
SortOrder getSortOrder() {
return sortOrder;
}
@Override
public int hashCode() {
return Objects.hash(sortFormat, sortValue, metricValue);
}
SortValue getSortValue() {
return sortValue;
}
String getFormattedSortValue() {
return sortValue.format(sortFormat);
}
String getMetricName() {
return metricName;
}
double getMetricValue() {
return metricValue;
@Override
public String toString() {
return "TopMetric[" + sortFormat + "," + sortValue + "," + metricValue + "]";
}
}
}

View File

@ -26,39 +26,54 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.builder.SearchSourceBuilder.SIZE_FIELD;
import static org.elasticsearch.search.builder.SearchSourceBuilder.SORT_FIELD;
public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<TopMetricsAggregationBuilder> {
public static final String NAME = "top_metrics";
public static final ParseField METRIC_FIELD = new ParseField("metric");
/**
* Default to returning only a single top metric.
*/
private static final int DEFAULT_SIZE = 1;
public static final ConstructingObjectParser<TopMetricsAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(NAME,
false, (args, name) -> {
@SuppressWarnings("unchecked")
List<SortBuilder<?>> sorts = (List<SortBuilder<?>>) args[0];
MultiValuesSourceFieldConfig metricField = (MultiValuesSourceFieldConfig) args[1];
return new TopMetricsAggregationBuilder(name, sorts, metricField);
int size = args[1] == null ? DEFAULT_SIZE : (Integer) args[1];
if (size < 1) {
throw new IllegalArgumentException("[size] must be more than 0 but was [" + size + "]");
}
MultiValuesSourceFieldConfig metricField = (MultiValuesSourceFieldConfig) args[2];
return new TopMetricsAggregationBuilder(name, sorts, size, metricField);
});
static {
PARSER.declareField(constructorArg(), (p, n) -> SortBuilder.fromXContent(p), SORT_FIELD,
ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING);
PARSER.declareInt(optionalConstructorArg(), SIZE_FIELD);
ContextParser<Void, MultiValuesSourceFieldConfig.Builder> metricParser = MultiValuesSourceFieldConfig.PARSER.apply(true, false);
PARSER.declareObject(constructorArg(), (p, n) -> metricParser.parse(p, null).build(), METRIC_FIELD);
}
private final List<SortBuilder<?>> sortBuilders;
// TODO MultiValuesSourceFieldConfig has more things than we support and less things than we want to support
private final int size;
private final MultiValuesSourceFieldConfig metricField;
/**
* Ctor for parsing.
* Build a {@code top_metrics} aggregation request.
*/
public TopMetricsAggregationBuilder(String name, List<SortBuilder<?>> sortBuilders, MultiValuesSourceFieldConfig metricField) {
public TopMetricsAggregationBuilder(String name, List<SortBuilder<?>> sortBuilders, int size,
MultiValuesSourceFieldConfig metricField) {
super(name);
if (sortBuilders.size() != 1) {
throw new IllegalArgumentException("[sort] must contain exactly one sort");
}
this.sortBuilders = sortBuilders;
this.size = size;
this.metricField = metricField;
}
@ -69,6 +84,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
Map<String, Object> metaData) {
super(clone, factoriesBuilder, metaData);
this.sortBuilders = clone.sortBuilders;
this.size = clone.size;
this.metricField = clone.metricField;
}
@ -78,14 +94,16 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
public TopMetricsAggregationBuilder(StreamInput in) throws IOException {
super(in);
@SuppressWarnings("unchecked")
List<SortBuilder<?>> sortBuilders = (List<SortBuilder<?>>) (List<?>) in.readNamedWriteableList(SortBuilder.class);
List<SortBuilder<?>> sortBuilders = (List<SortBuilder<?>>) (List<?>) in.readNamedWriteableList(SortBuilder.class);
this.sortBuilders = sortBuilders;
this.size = in.readVInt();
this.metricField = new MultiValuesSourceFieldConfig(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList(sortBuilders);
out.writeVInt(size);
metricField.writeTo(out);
}
@ -97,7 +115,8 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
@Override
protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, Builder subFactoriesBuilder)
throws IOException {
return new TopMetricsAggregatorFactory(name, queryShardContext, parent, subFactoriesBuilder, metaData, sortBuilders, metricField);
return new TopMetricsAggregatorFactory(name, queryShardContext, parent, subFactoriesBuilder, metaData, sortBuilders,
size, metricField);
}
@Override
@ -109,6 +128,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
sort.toXContent(builder, params);
}
builder.endArray();
builder.field(SIZE_FIELD.getPreferredName(), size);
builder.field(METRIC_FIELD.getPreferredName(), metricField);
}
builder.endObject();
@ -124,6 +144,10 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
return sortBuilders;
}
int getSize() {
return size;
}
MultiValuesSourceFieldConfig getMetricField() {
return metricField;
}

View File

@ -9,9 +9,12 @@ package org.elasticsearch.xpack.analytics.topmetrics;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -21,7 +24,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortValue;
import org.elasticsearch.search.sort.SortBuilder;
import java.io.IOException;
import java.util.List;
@ -42,32 +45,46 @@ import java.util.Map;
* some way to pick which document's metrics to use for the sort.
*/
class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
private final BucketedSort sort;
private final int size;
private final String metricName;
private final BucketedSort sort;
private final Values values;
private final ValuesSource.Numeric metricValueSource;
private DoubleArray values;
TopMetricsAggregator(String name, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, BucketedSort sort,
String metricName, ValuesSource.Numeric metricValueSource) throws IOException {
Map<String, Object> metaData, int size, String metricName,
SortBuilder<?> sort, ValuesSource.Numeric metricValueSource) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.sort = sort;
this.size = size;
this.metricName = metricName;
this.metricValueSource = metricValueSource;
if (metricValueSource != null) {
values = context.bigArrays().newDoubleArray(1, false);
values.fill(0, values.size(), Double.NaN);
values = new Values(size, context.bigArrays(), metricValueSource);
this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
} else {
values = null;
this.sort = null;
}
}
@Override
public boolean hasMetric(String name) {
if (size != 1) {
throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]");
}
return metricName.equals(name);
}
@Override
public double metric(String name, long owningBucketOrd) {
return values.get(owningBucketOrd);
assert size == 1;
/*
* Since size is always 1 we know that the index into the values
* array is same same as the bucket ordinal. Also, this will always
* be called after we've collected a bucket, so it won't just fetch
* garbage.
*/
return values.values.get(owningBucketOrd);
}
@Override
@ -84,21 +101,11 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
BucketedSort.Leaf leafSort = sort.forLeaf(ctx);
// TODO allow configuration of value mode
NumericDoubleValues metricValues = MultiValueMode.AVG.select(metricValueSource.doubleValues(ctx));
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (leafSort.collectIfCompetitive(doc, bucket)) {
if (bucket >= values.size()) {
long oldSize = values.size();
values = context.bigArrays().grow(values, bucket + 1);
values.fill(oldSize, values.size(), Double.NaN);
}
double metricValue = metricValues.advanceExact(doc) ? metricValues.doubleValue() : Double.NaN;
values.set(bucket, metricValue);
}
leafSort.collect(doc, bucket);
}
@Override
@ -113,10 +120,9 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
if (metricValueSource == null) {
return buildEmptyAggregation();
}
double metricValue = values.get(bucket);
SortValue sortValue = sort.getValue(bucket);
return new InternalTopMetrics(name, sort.getFormat(), sort.getOrder(), sortValue, metricName, metricValue, pipelineAggregators(),
metaData());
List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, values.resultBuilder(sort.getFormat()));
assert topMetrics.size() <= size;
return new InternalTopMetrics(name, sort.getOrder(), metricName, size, topMetrics, pipelineAggregators(), metaData());
}
@Override
@ -130,4 +136,47 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
public void doClose() {
Releasables.close(sort, values);
}
private static class Values implements BucketedSort.ExtraData, Releasable {
private final BigArrays bigArrays;
private final ValuesSource.Numeric metricValueSource;
private DoubleArray values;
Values(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
this.bigArrays = bigArrays;
this.metricValueSource = metricValueSource;
values = bigArrays.newDoubleArray(size, false);
}
BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
return (index, sortValue) ->
new InternalTopMetrics.TopMetric(sortFormat, sortValue, values.get(index));
}
@Override
public void swap(long lhs, long rhs) {
double tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
// TODO allow configuration of value mode
NumericDoubleValues metricValues = MultiValueMode.AVG.select(metricValueSource.doubleValues(ctx));
return (index, doc) -> {
if (index >= values.size()) {
values = bigArrays.grow(values, index + 1);
}
double metricValue = metricValues.advanceExact(doc) ? metricValues.doubleValue() : Double.NaN;
values.set(index, metricValue);
};
}
@Override
public void close() {
values.close();
}
}
}

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.analytics.topmetrics;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
@ -16,7 +18,6 @@ import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortBuilder;
import java.io.IOException;
@ -24,14 +25,24 @@ import java.util.List;
import java.util.Map;
public class TopMetricsAggregatorFactory extends AggregatorFactory {
/**
* Index setting describing the maximum number of top metrics that
* can be collected per bucket. This defaults to a low number because
* there can be a *huge* number of buckets
*/
public static final Setting<Integer> MAX_BUCKET_SIZE =
Setting.intSetting("index.top_metrics_max_size", 10, 1, Property.Dynamic, Property.IndexScope);
private final List<SortBuilder<?>> sortBuilders;
private final int size;
private final MultiValuesSourceFieldConfig metricField;
public TopMetricsAggregatorFactory(String name, QueryShardContext queryShardContext, AggregatorFactory parent,
Builder subFactoriesBuilder, Map<String, Object> metaData, List<SortBuilder<?>> sortBuilders,
MultiValuesSourceFieldConfig metricField) throws IOException {
int size, MultiValuesSourceFieldConfig metricField) throws IOException {
super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
this.sortBuilders = sortBuilders;
this.size = size;
this.metricField = metricField;
}
@ -41,19 +52,23 @@ public class TopMetricsAggregatorFactory extends AggregatorFactory {
ValuesSourceConfig<ValuesSource.Numeric> metricFieldSource = ValuesSourceConfig.resolve(queryShardContext, ValueType.NUMERIC,
metricField.getFieldName(), metricField.getScript(), metricField.getMissing(), metricField.getTimeZone(), null);
ValuesSource.Numeric metricValueSource = metricFieldSource.toValuesSource(queryShardContext);
int maxBucketSize = MAX_BUCKET_SIZE.get(searchContext.getQueryShardContext().getIndexSettings().getSettings());
if (size > maxBucketSize) {
throw new IllegalArgumentException("[top_metrics.size] must not be more than [" + maxBucketSize + "] but was [" + size
+ "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey()
+ "] index level setting.");
}
if (metricValueSource == null) {
return createUnmapped(searchContext, parent, pipelineAggregators, metaData);
}
BucketedSort bucketedSort = sortBuilders.get(0).buildBucketedSort(searchContext.getQueryShardContext());
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, bucketedSort,
metricField.getFieldName(), metricValueSource);
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
sortBuilders.get(0), metricValueSource);
}
private TopMetricsAggregator createUnmapped(SearchContext searchContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, null, metricField.getFieldName(),
null);
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
null, null);
}
}

View File

@ -31,49 +31,70 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
public void testFirstEmpty() {
InternalTopMetrics first = buildEmpty();
InternalTopMetrics reduced = reduce(first, buildFilled(SortValue.from(1), 1.0));
InternalTopMetrics reduced = reduce(first, buildFilled(1, top(SortValue.from(1), 1.0)));
assertThat(reduced, sameInstance(first));
}
public void testMany() {
InternalTopMetrics first = buildFilled(SortValue.from(2.0), randomDouble());
InternalTopMetrics min = buildFilled(SortValue.from(1.0), randomDouble());
InternalTopMetrics max = buildFilled(SortValue.from(7.0), randomDouble());
public void testManyToReduce() {
InternalTopMetrics first = buildFilled(1, top(SortValue.from(2.0), randomDouble()));
InternalTopMetrics min = buildFilled(2, top(SortValue.from(1.0), randomDouble()));
InternalTopMetrics max = buildFilled(3, top(SortValue.from(7.0), randomDouble()));
InternalTopMetrics[] metrics = new InternalTopMetrics[] {
first, max, min, buildEmpty(), buildEmpty(),
};
InternalTopMetrics winner = first.getSortOrder() == SortOrder.ASC ? min : max;
InternalTopMetrics reduced = reduce(metrics);
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getSortValue(), equalTo(winner.getSortValue()));
assertThat(reduced.getSortFormat(), equalTo(winner.getSortFormat()));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getMetricValue(), equalTo(winner.getMetricValue()));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getSize(), equalTo(first.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
}
public void testNonZeroSize() {
InternalTopMetrics first = buildFilled(SortOrder.DESC, 3, top(SortValue.from(2.0), 1));
InternalTopMetrics second = buildFilled(2, top(SortValue.from(3.0), 2), top(SortValue.from(1.0), 2));
InternalTopMetrics third = buildFilled(3, top(SortValue.from(8.0), 4), top(SortValue.from(7.0), 5));
InternalTopMetrics[] metrics = new InternalTopMetrics[] {
first, second, third, buildEmpty(), buildEmpty(),
};
InternalTopMetrics reduced = reduce(metrics);
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getSize(), equalTo(first.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(Arrays.asList(
third.getTopMetrics().get(0), third.getTopMetrics().get(1), second.getTopMetrics().get(0))));
}
public void testDifferentTypes() {
InternalTopMetrics doubleMetrics = buildFilled(SortValue.from(100.0), randomDouble());
InternalTopMetrics longMetrics = buildFilled(SortValue.from(7), randomDouble());
InternalTopMetrics doubleMetrics = buildFilled(1, top(SortValue.from(100.0), randomDouble()));
InternalTopMetrics longMetrics = buildFilled(1, top(SortValue.from(7), randomDouble()));
InternalTopMetrics reduced = reduce(doubleMetrics, longMetrics);
// Doubles sort first.
InternalTopMetrics winner = doubleMetrics.getSortOrder() == SortOrder.ASC ? doubleMetrics : longMetrics;
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getSortValue(), equalTo(winner.getSortValue()));
assertThat(reduced.getSortFormat(), equalTo(winner.getSortFormat()));
assertThat(reduced.getSortOrder(), equalTo(doubleMetrics.getSortOrder()));
assertThat(reduced.getMetricValue(), equalTo(winner.getMetricValue()));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getSortOrder(), equalTo(doubleMetrics.getSortOrder()));
assertThat(reduced.getSize(), equalTo(doubleMetrics.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
}
private InternalTopMetrics buildEmpty() {
return InternalTopMetrics.buildEmptyAggregation("test", "test", emptyList(), null);
}
private InternalTopMetrics buildFilled(SortValue sortValue, double metricValue) {
private InternalTopMetrics buildFilled(int size, InternalTopMetrics.TopMetric... metrics) {
return buildFilled(randomFrom(SortOrder.values()), size, metrics);
}
private InternalTopMetrics buildFilled(SortOrder sortOrder, int size, InternalTopMetrics.TopMetric... metrics) {
return new InternalTopMetrics("test", sortOrder, "test", size, Arrays.asList(metrics), emptyList(), null);
}
private InternalTopMetrics.TopMetric top(SortValue sortValue, double metricValue) {
DocValueFormat sortFormat = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN, DocValueFormat.IP);
SortOrder sortOrder = randomFrom(SortOrder.values());
return new InternalTopMetrics("test", sortFormat, sortOrder, sortValue, "test", metricValue, emptyList(), null);
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, metricValue);
}
private InternalTopMetrics reduce(InternalTopMetrics... results) {

View File

@ -25,18 +25,30 @@ import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class InternalTopMetricsTests extends InternalAggregationTestCase<InternalTopMetrics> {
/**
* Sort order to use for randomly generated instances. This is fixed
* for each test method so that randomly generated instances can be
* merged. If it weren't fixed {@link InternalAggregationTestCase#testReduceRandom()}
* would fail because the instances that it attempts to reduce don't
* have their results in the same order.
*/
private SortOrder sortOrder = randomFrom(SortOrder.values());
public void testEmptyIsNotMapped() {
InternalTopMetrics empty = InternalTopMetrics.buildEmptyAggregation(
randomAlphaOfLength(5), randomAlphaOfLength(2), emptyList(), null);
@ -44,13 +56,13 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
}
public void testNonEmptyIsMapped() {
InternalTopMetrics nonEmpty = randomValueOtherThanMany(tm -> tm.getSortValue() == null, this::createTestInstance);
InternalTopMetrics nonEmpty = randomValueOtherThanMany(i -> i.getTopMetrics().isEmpty(), this::createTestInstance);
assertTrue(nonEmpty.isMapped());
}
public void testToXContentDoubleSortValue() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", DocValueFormat.RAW, randomFrom(SortOrder.values()), SortValue.from(1.0),
"test", 1.0, emptyList(), null);
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
Arrays.asList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0)), emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -72,8 +84,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
DocValueFormat sortFormat = new DocValueFormat.DateTime(DateFormatter.forPattern("strict_date_time"), ZoneId.of("UTC"),
DateFieldMapper.Resolution.MILLISECONDS);
SortValue sortValue = SortValue.from(ZonedDateTime.parse("2007-12-03T10:15:30Z").toInstant().toEpochMilli());
InternalTopMetrics tm = new InternalTopMetrics("test", sortFormat, randomFrom(SortOrder.values()), sortValue, "test", 1.0,
emptyList(), null);
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
Arrays.asList(new InternalTopMetrics.TopMetric(sortFormat, sortValue, 1.0)), emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -91,6 +103,37 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
"}"));
}
public void testToXContentManyValues() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 2,
Arrays.asList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), 2.0)),
emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
" \"top\" : [\n" +
" {\n" +
" \"sort\" : [\n" +
" 1.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"test\" : 1.0\n" +
" }\n" +
" },\n" +
" {\n" +
" \"sort\" : [\n" +
" 2.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"test\" : 2.0\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
"}"));
}
@Override
protected List<NamedXContentRegistry.Entry> getNamedXContents() {
List<NamedXContentRegistry.Entry> result = new ArrayList<>(super.getNamedXContents());
@ -102,45 +145,42 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
@Override
protected InternalTopMetrics createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
DocValueFormat sortFormat = randomNumericDocValueFormat();
SortOrder sortOrder = randomFrom(SortOrder.values());
SortValue sortValue = randomSortValue();
String metricName = randomAlphaOfLength(5);
double metricValue = randomDouble();
return new InternalTopMetrics(name, sortFormat, sortOrder, sortValue, metricName, metricValue, pipelineAggregators, metaData);
int size = between(1, 100);
List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size));
return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics, pipelineAggregators, metaData);
}
@Override
protected InternalTopMetrics mutateInstance(InternalTopMetrics instance) throws IOException {
String name = instance.getName();
DocValueFormat sortFormat = instance.getSortFormat();
SortOrder sortOrder = instance.getSortOrder();
SortValue sortValue = instance.getSortValue();
String metricName = instance.getMetricName();
double metricValue = instance.getMetricValue();
switch (randomInt(5)) {
int size = instance.getSize();
List<InternalTopMetrics.TopMetric> topMetrics = instance.getTopMetrics();
switch (randomInt(4)) {
case 0:
name = randomAlphaOfLength(6);
break;
case 1:
sortFormat = randomValueOtherThan(sortFormat, InternalAggregationTestCase::randomNumericDocValueFormat);
sortOrder = sortOrder == SortOrder.ASC ? SortOrder.DESC : SortOrder.ASC;
Collections.reverse(topMetrics);
break;
case 2:
sortOrder = sortOrder == SortOrder.ASC ? SortOrder.DESC : SortOrder.ASC;
break;
case 3:
sortValue = randomValueOtherThan(sortValue, InternalTopMetricsTests::randomSortValue);
break;
case 4:
metricName = randomAlphaOfLength(6);
break;
case 5:
metricValue = randomValueOtherThan(metricValue, () -> randomDouble());
case 3:
size = randomValueOtherThan(size, () -> between(1, 100));
break;
case 4:
int fixedSize = size;
topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize)));
break;
default:
throw new IllegalArgumentException("bad mutation");
}
return new InternalTopMetrics(name, sortFormat, sortOrder, sortValue, metricName, metricValue, emptyList(), null);
return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics,
instance.pipelineAggregators(), instance.getMetaData());
}
@Override
@ -152,51 +192,44 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
protected void assertFromXContent(InternalTopMetrics aggregation, ParsedAggregation parsedAggregation) throws IOException {
ParsedTopMetrics parsed = (ParsedTopMetrics) parsedAggregation;
assertThat(parsed.getName(), equalTo(aggregation.getName()));
if (false == aggregation.isMapped()) {
assertThat(parsed.getTopMetrics(), hasSize(0));
return;
assertThat(parsed.getTopMetrics(), hasSize(aggregation.getTopMetrics().size()));
for (int i = 0; i < parsed.getTopMetrics().size(); i++) {
ParsedTopMetrics.TopMetrics parsedTop = parsed.getTopMetrics().get(i);
InternalTopMetrics.TopMetric internalTop = aggregation.getTopMetrics().get(i);
Object expectedSort = internalTop.getSortFormat() == DocValueFormat.RAW ?
internalTop.getSortValue().getKey() : internalTop.getSortValue().format(internalTop.getSortFormat());
assertThat(parsedTop.getSort(), equalTo(singletonList(expectedSort)));
assertThat(parsedTop.getMetrics(), equalTo(singletonMap(aggregation.getMetricName(), internalTop.getMetricValue())));
}
assertThat(parsed.getTopMetrics(), hasSize(1));
ParsedTopMetrics.TopMetrics parsedTop = parsed.getTopMetrics().get(0);
Object expectedSort = aggregation.getSortFormat() == DocValueFormat.RAW ?
aggregation.getSortValue().getKey() : aggregation.getFormattedSortValue();
assertThat(parsedTop.getSort(), equalTo(singletonList(expectedSort)));
assertThat(parsedTop.getMetrics(), equalTo(singletonMap(aggregation.getMetricName(), aggregation.getMetricValue())));
}
@Override
protected void assertReduced(InternalTopMetrics reduced, List<InternalTopMetrics> inputs) {
InternalTopMetrics first = inputs.get(0);
Optional<InternalTopMetrics> winner = inputs.stream()
.filter(tm -> tm.isMapped())
.min((lhs, rhs) -> first.getSortOrder().reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()));
List<InternalTopMetrics.TopMetric> metrics = new ArrayList<>();
for (InternalTopMetrics input : inputs) {
metrics.addAll(input.getTopMetrics());
}
Collections.sort(metrics, (lhs, rhs) -> first.getSortOrder().reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()));
List<InternalTopMetrics.TopMetric> winners = metrics.size() > first.getSize() ? metrics.subList(0, first.getSize()) : metrics;
assertThat(reduced.getName(), equalTo(first.getName()));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getMetricName(), equalTo(first.getMetricName()));
if (winner.isPresent()) {
assertThat(reduced.getSortValue(), equalTo(winner.get().getSortValue()));
assertThat(reduced.getSortFormat(), equalTo(winner.get().getSortFormat()));
assertThat(reduced.getMetricValue(), equalTo(winner.get().getMetricValue()));
} else {
// Reduced only unmapped metrics
assertThat(reduced.getSortValue(), equalTo(first.getSortValue()));
assertThat(reduced.getSortFormat(), equalTo(first.getSortFormat()));
assertThat(reduced.getMetricValue(), equalTo(first.getMetricValue()));
}
assertThat(reduced.getTopMetrics(), equalTo(winners));
}
private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length) {
return IntStream.range(0, length)
.mapToObj(i -> new InternalTopMetrics.TopMetric(randomNumericDocValueFormat(), randomSortValue(), randomDouble()))
.sorted((lhs, rhs) -> sortOrder.reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()))
.collect(toList());
}
private static SortValue randomSortValue() {
switch (between(0, 2)) {
case 0:
return null;
case 1:
if (randomBoolean()) {
return SortValue.from(randomLong());
case 2:
return SortValue.from(randomDouble());
default:
throw new IllegalArgumentException("unsupported random sort");
}
return SortValue.from(randomDouble());
}
@Override

View File

@ -70,7 +70,7 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
new FieldSortBuilder(randomAlphaOfLength(5)).order(randomFrom(SortOrder.values())));
MultiValuesSourceFieldConfig.Builder metricField = new MultiValuesSourceFieldConfig.Builder();
metricField.setFieldName(randomAlphaOfLength(5)).setMissing(1.0);
return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, metricField.build());
return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, between(1, 100), metricField.build());
}
public void testClientBuilder() throws IOException {
@ -97,6 +97,7 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
return new org.elasticsearch.client.analytics.TopMetricsAggregationBuilder(
serverBuilder.getName(),
serverBuilder.getSortBuilders().get(0),
serverBuilder.getSize(),
serverBuilder.getMetricField().getFieldName());
}
}

View File

@ -70,13 +70,13 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notANumber;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -85,10 +85,8 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
public void testNoDocs() throws IOException {
InternalTopMetrics result = collect(simpleBuilder(), new MatchAllDocsQuery(), writer -> {},
doubleFields());
assertThat(result.getSortFormat(), equalTo(DocValueFormat.RAW));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), nullValue());
assertThat(result.getMetricValue(), notANumber());
assertThat(result.getTopMetrics(), equalTo(emptyList()));
}
public void testUnmappedMetric() throws IOException {
@ -97,8 +95,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
numberFieldType(NumberType.DOUBLE, "s"));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), nullValue());
assertThat(result.getMetricValue(), notANumber());
assertThat(result.getTopMetrics(), equalTo(emptyList()));
}
public void testMissingValueForMetric() throws IOException {
@ -107,8 +104,9 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
doubleFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), notANumber());
assertThat(result.getTopMetrics(), hasSize(1));
assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getTopMetrics().get(0).getMetricValue(), notANumber());
}
public void testActualValueForMetric() throws IOException {
@ -117,10 +115,9 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
doubleFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 2.0))));
}
private InternalTopMetrics collectFromDoubles(TopMetricsAggregationBuilder builder) throws IOException {
return collect(builder, new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(doubleField("s", 1.0), doubleField("m", 2.0)));
@ -132,24 +129,27 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
public void testSortByDoubleAscending() throws IOException {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 2.0))));
}
public void testSortByDoubleDescending() throws IOException {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.DESC)));
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(2.0)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(2.0, 3.0))));
}
public void testSortByDoubleCastToLong() throws IOException {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").setNumericType("long")));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1, 2.0))));
}
public void testSortByDoubleTwoHits() throws IOException {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC), 2));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(Arrays.asList(top(1.0, 2.0), top(2.0, 3.0))));
}
public void testSortByFloatAscending() throws IOException {
TopMetricsAggregationBuilder builder = simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC));
@ -159,8 +159,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
floatAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 2.0d))));
}
public void testSortByFloatDescending() throws IOException {
@ -171,8 +170,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
floatAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(2.0)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(2.0, 3.0))));
}
public void testSortByLongAscending() throws IOException {
@ -183,8 +181,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
longAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(10)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(10, 2.0))));
}
public void testSortByLongDescending() throws IOException {
@ -195,8 +192,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
longAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(20)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(20, 3.0))));
}
public void testSortByScoreDescending() throws IOException {
@ -207,8 +203,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
textAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(2.0)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(2.0, 2.0))));
}
public void testSortByScoreAscending() throws IOException {
@ -219,8 +214,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
textAndDoubleField());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 3.0))));
}
public void testSortByScriptDescending() throws IOException {
@ -231,8 +225,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
doubleFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(2.0)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(2.0, 2.0))));
}
public void testSortByScriptAscending() throws IOException {
@ -243,8 +236,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
doubleFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 3.0))));
}
public void testSortByStringScriptFails() throws IOException {
@ -271,16 +263,22 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
TopMetricsAggregationBuilder builder = simpleBuilder(new GeoDistanceSortBuilder("s", 35.7796, 78.6382).order(SortOrder.DESC));
InternalTopMetrics result = collectFromNewYorkAndLA(builder);
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.2054632268631617E7)));
assertThat(result.getMetricValue(), equalTo(3.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.2054632268631617E7, 3.0))));
}
public void testSortByGeoDistanceAscending() throws IOException {
TopMetricsAggregationBuilder builder = simpleBuilder(new GeoDistanceSortBuilder("s", 35.7796, 78.6382).order(SortOrder.ASC));
InternalTopMetrics result = collectFromNewYorkAndLA(builder);
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getSortValue(), equalTo(SortValue.from(1.1062351376961706E7)));
assertThat(result.getMetricValue(), equalTo(2.0d));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.1062351376961706E7, 2.0))));
}
public void testSortByGeoDistanceTwoHits() throws IOException {
TopMetricsAggregationBuilder builder = simpleBuilder(new GeoDistanceSortBuilder("s", 35.7796, 78.6382).order(SortOrder.DESC), 2);
InternalTopMetrics result = collectFromNewYorkAndLA(builder);
assertThat(result.getSize(), equalTo(2));
assertThat(result.getSortOrder(), equalTo(SortOrder.DESC));
assertThat(result.getTopMetrics(), equalTo(Arrays.asList(top(1.2054632268631617E7, 3.0), top(1.1062351376961706E7, 2.0))));
}
public void testInsideTerms() throws IOException {
@ -296,14 +294,12 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
assertThat(bucket1.getKey(), equalTo(1.0));
InternalTopMetrics top1 = bucket1.getAggregations().get("test");
assertThat(top1.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(top1.getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(top1.getMetricValue(), equalTo(2.0d));
assertThat(top1.getTopMetrics(), equalTo(singletonList(top(1.0, 2.0))));
Terms.Bucket bucket2 = result.getBuckets().get(1);
assertThat(bucket2.getKey(), equalTo(2.0));
InternalTopMetrics top2 = bucket2.getAggregations().get("test");
assertThat(top2.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(top2.getSortValue(), equalTo(SortValue.from(4.0)));
assertThat(top2.getMetricValue(), equalTo(9.0d));
assertThat(top2.getTopMetrics(), equalTo(singletonList(top(4.0, 9.0))));
}
public void testTonsOfBucketsTriggersBreaker() throws IOException {
@ -358,11 +354,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
*/
int bucketThatBreaks = 922;
for (int b = 0; b < bucketThatBreaks; b++) {
try {
leaf.collect(0, b);
} catch (Exception e) {
throw new RuntimeException("ADFADFS " + b, e);
}
leaf.collect(0, b);
}
CircuitBreakingException e = expectThrows(CircuitBreakingException.class, () -> leaf.collect(0, bucketThatBreaks));
assertThat(e.getMessage(), equalTo("test error"));
@ -372,15 +364,19 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
}
}
private TopMetricsAggregationBuilder simpleBuilder(SortBuilder<?> sort) {
return new TopMetricsAggregationBuilder("test", singletonList(sort),
new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build());
}
private TopMetricsAggregationBuilder simpleBuilder() {
return simpleBuilder(new FieldSortBuilder("s"));
}
private TopMetricsAggregationBuilder simpleBuilder(SortBuilder<?> sort) {
return simpleBuilder(sort, 1);
}
private TopMetricsAggregationBuilder simpleBuilder(SortBuilder<?> sort, int size) {
return new TopMetricsAggregationBuilder("test", singletonList(sort), size,
new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build());
}
/**
* Build a query that matches all documents but adds 1 to the score of
* all docs that contain "foo". We use this instead of a term query
@ -456,7 +452,6 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
private InternalTopMetrics collect(TopMetricsAggregationBuilder builder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex, MappedFieldType... fields) throws IOException {
InternalTopMetrics result = (InternalTopMetrics) collect((AggregationBuilder) builder, query, buildIndex, fields);
assertThat(result.getSortFormat(), equalTo(DocValueFormat.RAW));
assertThat(result.getMetricName(), equalTo(builder.getMetricField().getFieldName()));
return result;
}
@ -475,6 +470,14 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
}
}
private InternalTopMetrics.TopMetric top(long sortValue, double metricValue) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
}
private InternalTopMetrics.TopMetric top(double sortValue, double metricValue) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
}
/**
* Builds a simple script that reads the "s" field.
*/

View File

@ -392,7 +392,7 @@ public final class FlatObjectFieldMapper extends DynamicKeyFieldMapper {
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}

View File

@ -1,5 +1,9 @@
---
"sort by long field":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
bulk:
index: test
@ -56,6 +60,10 @@
---
"sort by double field":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -106,6 +114,10 @@
---
"sort by scaled float field":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -156,6 +168,10 @@
---
"sort by keyword field fails":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -187,6 +203,10 @@
---
"sort by score":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -224,6 +244,10 @@
---
"sort by numeric script":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
bulk:
index: test
@ -251,6 +275,10 @@
---
"sort by string script fails":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -286,6 +314,10 @@
---
"sort by geo_distance":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -321,8 +353,32 @@
- match: { aggregations.pop.top.0.metrics.population: 8623000 }
- match: { aggregations.pop.top.0.sort: [681335.0456554737] }
- do:
search:
size: 0
body:
aggs:
pop:
top_metrics:
size: 3
metric:
field: population
sort:
_geo_distance:
location: "35.7796, -78.6382"
- match: { aggregations.pop.top.0.metrics.population: 8623000 }
- match: { aggregations.pop.top.0.sort: [681335.0456554737] }
- match: { aggregations.pop.top.1.metrics.population: 2716000 }
- match: { aggregations.pop.top.1.sort: [1031665.3103809588] }
- match: { aggregations.pop.top.2.metrics.population: 4000000 }
- match: { aggregations.pop.top.2.sort: [3591714.92471555] }
---
"inside terms":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
@ -392,3 +448,68 @@
- match: { aggregations.ip.buckets.1.key: 192.168.0.1 }
- match: { aggregations.ip.buckets.1.tm.top.0.metrics.v: 2 }
- match: { aggregations.ip.buckets.1.tm.top.0.sort: ['2020-01-01T02:01:01.000Z'] }
---
"size is index setting":
- skip:
version: " - 7.7.0"
reason: added in 7.7.0
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1 # The failure message isn't predictable with more than one shard
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"s": 1, "v": 3.1415}'
- '{"index": {}}'
- '{"s": 2, "v": 1.0}'
- '{"index": {}}'
- '{"s": 3, "v": 2.71828}'
- do:
catch: bad_request
search:
size: 0
body:
aggs:
tm:
top_metrics:
size: 100
metric:
field: v
sort:
s: desc
- match: { error.root_cause.0.reason: "error building sort for field [s.keyword] of type [keyword] in index [test]: only supported on numeric fields" }
- do:
indices.put_settings:
index: test
body:
top_metrics_max_size: 100
- do:
search:
size: 0
body:
aggs:
tm:
top_metrics:
size: 100
metric:
field: v
sort:
s: desc
- match: { aggregations.tm.top.0.metrics.v: 2.718280076980591 }
- match: { aggregations.tm.top.0.sort: [3] }
- match: { aggregations.tm.top.1.metrics.v: 1.0 }
- match: { aggregations.tm.top.1.sort: [2] }
- match: { aggregations.tm.top.2.metrics.v: 3.1414999961853027 }
- match: { aggregations.tm.top.2.sort: [1] }

View File

@ -41,7 +41,7 @@ public class VectorDVIndexFieldData extends DocValuesIndexFieldData implements I
@Override
public BucketedSort newBucketedSort(BigArrays bigArrays, Object missingValue, MultiValueMode sortMode, Nested nested,
SortOrder sortOrder, DocValueFormat format) {
SortOrder sortOrder, DocValueFormat format, int bucketSize, BucketedSort.ExtraData extra) {
throw new IllegalArgumentException("only supported on numeric fields");
}