Preserve metric types in top_metrics (backport of #53288) (#53440)

This changes the `top_metrics` aggregation to return metrics in their
original type. Since it only supports numerics, that means that dates,
longs, and doubles will come back as stored, with their appropriate
formatter applied.
This commit is contained in:
Nik Everett 2020-03-12 17:17:09 -04:00 committed by GitHub
parent 97621e7f65
commit 9dcd64c110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 826 additions and 133 deletions

View File

@ -24,12 +24,10 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -88,9 +86,9 @@ public class ParsedTopMetrics extends ParsedAggregation {
private static final ParseField METRICS_FIELD = new ParseField("metrics");
private final List<Object> sort;
private final Map<String, Double> metrics;
private final Map<String, Object> metrics;
private TopMetrics(List<Object> sort, Map<String, Double> metrics) {
private TopMetrics(List<Object> sort, Map<String, Object> metrics) {
this.sort = sort;
this.metrics = metrics;
}
@ -105,7 +103,7 @@ public class ParsedTopMetrics extends ParsedAggregation {
/**
* The top metric values returned by the aggregation.
*/
public Map<String, Double> getMetrics() {
public Map<String, Object> getMetrics() {
return metrics;
}
@ -114,13 +112,13 @@ public class ParsedTopMetrics extends ParsedAggregation {
@SuppressWarnings("unchecked")
List<Object> sort = (List<Object>) args[0];
@SuppressWarnings("unchecked")
Map<String, Double> metrics = (Map<String, Double>) args[1];
Map<String, Object> metrics = (Map<String, Object>) args[1];
return new TopMetrics(sort, metrics);
});
static {
PARSER.declareFieldArray(constructorArg(), (p, c) -> XContentParserUtils.parseFieldsValue(p),
SORT_FIELD, ObjectParser.ValueType.VALUE_ARRAY);
PARSER.declareObject(constructorArg(), (p, c) -> p.map(HashMap::new, XContentParser::doubleValue), METRICS_FIELD);
PARSER.declareObject(constructorArg(), (p, c) -> p.map(), METRICS_FIELD);
}
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
@ -61,8 +62,8 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(stats.getDistribution(), hasEntry(equalTo("t"), closeTo(.09, .005)));
}
public void testTopMetricsSizeOne() throws IOException {
indexTopMetricsData();
public void testTopMetricsDoubleMetric() throws IOException {
indexTopMetricsDoubleTestData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v"));
@ -74,8 +75,34 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3.0)));
}
public void testTopMetricsLongMetric() throws IOException {
indexTopMetricsLongTestData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v"));
SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
ParsedTopMetrics top = response.getAggregations().get("test");
assertThat(top.getTopMetrics(), hasSize(1));
ParsedTopMetrics.TopMetrics metric = top.getTopMetrics().get(0);
assertThat(metric.getSort(), equalTo(singletonList(2)));
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3)));
}
public void testTopMetricsDateMetric() throws IOException {
indexTopMetricsDateTestData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v"));
SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
ParsedTopMetrics top = response.getAggregations().get("test");
assertThat(top.getTopMetrics(), hasSize(1));
ParsedTopMetrics.TopMetrics metric = top.getTopMetrics().get(0);
assertThat(metric.getSort(), equalTo(singletonList(2)));
assertThat(metric.getMetrics(), equalTo(singletonMap("v", "2020-01-02T01:01:00.000Z")));
}
public void testTopMetricsManyMetrics() throws IOException {
indexTopMetricsData();
indexTopMetricsDoubleTestData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v", "m"));
@ -89,7 +116,7 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
}
public void testTopMetricsSizeTwo() throws IOException {
indexTopMetricsData();
indexTopMetricsDoubleTestData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 2, "v"));
@ -104,10 +131,28 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 2.0)));
}
private void indexTopMetricsData() throws IOException {
private void indexTopMetricsDoubleTestData() throws IOException {
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2.0, "m", 12.0));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3.0, "m", 13.0));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
}
private void indexTopMetricsLongTestData() throws IOException {
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
}
private void indexTopMetricsDateTestData() throws IOException {
CreateIndexRequest create = new CreateIndexRequest("test");
create.mapping("{\"properties\": {\"v\": {\"type\": \"date\"}}}", XContentType.JSON);
highLevelClient().indices().create(create, RequestOptions.DEFAULT);
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", "2020-01-01T01:01:00Z"));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", "2020-01-02T01:01:00Z"));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
}
}

View File

@ -70,23 +70,27 @@ the same sort values then this aggregation could return either document's fields
==== `metrics`
`metrics` selects the fields to of the "top" document to return. Like most other
aggregations, `top_metrics` casts these values cast to `double` precision
floating point numbers. So they have to be numeric. Dates *work*, but they
come back as a `double` precision floating point containing milliseconds since
epoch. `keyword` fields aren't allowed.
`metrics` selects the fields to of the "top" document to return.
You can return multiple metrics by providing a list:
[source,console,id=search-aggregations-metrics-top-metrics-list-of-metrics]
----
PUT /test
{
"mappings": {
"properties": {
"d": {"type": "date"}
}
}
}
POST /test/_bulk?refresh
{"index": {}}
{"s": 1, "v": 3.1415, "m": 1.9}
{"s": 1, "v": 3.1415, "m": 1, "d": "2020-01-01T00:12:12Z"}
{"index": {}}
{"s": 2, "v": 1.0, "m": 6.7}
{"s": 2, "v": 1.0, "m": 6, "d": "2020-01-02T00:12:12Z"}
{"index": {}}
{"s": 3, "v": 2.71828, "m": -12.2}
{"s": 3, "v": 2.71828, "m": -12, "d": "2019-12-31T00:12:12Z"}
POST /test/_search?filter_path=aggregations
{
"aggs": {
@ -94,7 +98,8 @@ POST /test/_search?filter_path=aggregations
"top_metrics": {
"metrics": [
{"field": "v"},
{"field": "m"}
{"field": "m"},
{"field": "d"}
],
"sort": {"s": "desc"}
}
@ -114,7 +119,8 @@ Which returns:
"sort": [3],
"metrics": {
"v": 2.718280076980591,
"m": -12.199999809265137
"m": -12,
"d": "2019-12-31T00:12:12.000Z"
}
} ]
}
@ -123,7 +129,6 @@ Which returns:
----
// TESTRESPONSE
==== `size`
`top_metrics` can return the top few document's worth of metrics using the size parameter:
@ -246,14 +251,14 @@ Which returns:
"key": "192.168.0.1",
"doc_count": 2,
"tm": {
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 2.0 } } ]
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 2 } } ]
}
},
{
"key": "192.168.0.2",
"doc_count": 1,
"tm": {
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 3.0 } } ]
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 3 } } ]
}
}
],
@ -303,14 +308,14 @@ Which returns:
"key": "192.168.0.2",
"doc_count": 1,
"tm": {
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 3.0 } } ]
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 3 } } ]
}
},
{
"key": "192.168.0.1",
"doc_count": 2,
"tm": {
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 2.0 } } ]
"top": [ {"sort": ["2020-01-01T02:01:01.000Z"], "metrics": {"v": 2 } } ]
}
}
],

View File

@ -117,6 +117,11 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
@Override
public abstract String toString();
/**
* Return this {@linkplain SortValue} as a boxed {@linkplain Number}.
*/
public abstract Number numberValue();
private static class DoubleSortValue extends SortValue {
public static final String NAME = "double";
@ -179,6 +184,11 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
public String toString() {
return Double.toString(key);
}
@Override
public Number numberValue() {
return key;
}
}
private static class LongSortValue extends SortValue {
@ -243,5 +253,10 @@ public abstract class SortValue implements NamedWriteable, Comparable<SortValue>
public String toString() {
return Long.toString(key);
}
@Override
public Number numberValue() {
return key;
}
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -20,7 +21,6 @@ import org.elasticsearch.search.sort.SortValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -94,7 +94,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
return null;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValues[index];
return topMetrics.get(0).metricValues.get(index).numberValue();
}
@Override
@ -161,13 +161,14 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
public double value(String name) {
int index = metricNames.indexOf(name);
if (index < 0) {
throw new IllegalArgumentException("unknown metric [" + name + "]");
throw new IllegalArgumentException("unknown metric [" + name + "]");
}
if (topMetrics.isEmpty()) {
return Double.NaN;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValues[index];
// TODO it'd probably be nicer to have "compareTo" instead of assuming a double.
return topMetrics.get(0).metricValues.get(index).numberValue().doubleValue();
}
SortOrder getSortOrder() {
@ -206,9 +207,9 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
static class TopMetric implements Writeable, Comparable<TopMetric> {
private final DocValueFormat sortFormat;
private final SortValue sortValue;
private final double[] metricValues;
private final List<MetricValue> metricValues;
TopMetric(DocValueFormat sortFormat, SortValue sortValue, double[] metricValues) {
TopMetric(DocValueFormat sortFormat, SortValue sortValue, List<MetricValue> metricValues) {
this.sortFormat = sortFormat;
this.sortValue = sortValue;
this.metricValues = metricValues;
@ -217,14 +218,14 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
TopMetric(StreamInput in) throws IOException {
sortFormat = in.readNamedWriteable(DocValueFormat.class);
sortValue = in.readNamedWriteable(SortValue.class);
metricValues = in.readDoubleArray();
metricValues = in.readList(s -> s.readOptionalWriteable(MetricValue::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(sortFormat);
out.writeNamedWriteable(sortValue);
out.writeDoubleArray(metricValues);
out.writeCollection(metricValues, StreamOutput::writeOptionalWriteable);
}
DocValueFormat getSortFormat() {
@ -235,7 +236,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
return sortValue;
}
double[] getMetricValues() {
List<MetricValue> getMetricValues() {
return metricValues;
}
@ -246,9 +247,13 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
sortValue.toXContent(builder, sortFormat);
builder.endArray();
builder.startObject(METRIC_FIELD.getPreferredName());
{
for (int i = 0; i < metricValues.length; i++) {
builder.field(metricNames.get(i), Double.isNaN(metricValues[i]) ? null : metricValues[i]);
for (int i = 0; i < metricValues.size(); i++) {
MetricValue value = metricValues.get(i);
builder.field(metricNames.get(i));
if (value == null) {
builder.nullValue();
} else {
value.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
}
builder.endObject();
@ -269,17 +274,79 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
TopMetric other = (TopMetric) obj;
return sortFormat.equals(other.sortFormat)
&& sortValue.equals(other.sortValue)
&& Arrays.equals(metricValues, other.metricValues);
&& metricValues.equals(other.metricValues);
}
@Override
public int hashCode() {
return Objects.hash(sortFormat, sortValue, Arrays.hashCode(metricValues));
return Objects.hash(sortFormat, sortValue, metricValues);
}
@Override
public String toString() {
return "TopMetric[" + sortFormat + "," + sortValue + "," + Arrays.toString(metricValues) + "]";
return "TopMetric[" + sortFormat + "," + sortValue + "," + metricValues + "]";
}
}
static class MetricValue implements Writeable, ToXContent {
private final DocValueFormat format;
/**
* It is odd to have a "SortValue" be part of a MetricValue but it is
* a very convenient way to send a type-aware thing across the
* wire though. So here we are.
*/
private final SortValue value;
MetricValue(DocValueFormat format, SortValue value) {
this.format = format;
this.value = value;
}
DocValueFormat getFormat() {
return format;
}
SortValue getValue() {
return value;
}
MetricValue(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
value = in.readNamedWriteable(SortValue.class);
}
Number numberValue() {
return value.numberValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeNamedWriteable(value);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return value.toXContent(builder, format);
}
@Override
public String toString() {
return format + "," + value;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
MetricValue other = (MetricValue) obj;
return format.equals(other.format) && value.equals(other.value);
}
@Override
public int hashCode() {
return Objects.hash(format, value);
}
}
}

View File

@ -7,14 +7,16 @@
package org.elasticsearch.xpack.analytics.topmetrics;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
@ -26,11 +28,17 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortValue;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.toList;
/**
* Collects the {@code top_metrics} aggregation, which functions like a memory
* efficient but limited version of the {@code top_hits} aggregation. Amortized,
@ -52,17 +60,16 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
TopMetricsAggregator(String name, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, int size,
SortBuilder<?> sort, List<String> metricNames, List<ValuesSource.Numeric> metricValuesSources) throws IOException {
SortBuilder<?> sort, List<MetricSource> metricSources) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.size = size;
assert metricNames.size() == metricValuesSources.size();
metrics = new Metrics(size, context.getQueryShardContext(), metricNames, metricValuesSources);
metrics = new Metrics(size, context.getQueryShardContext().bigArrays(), metricSources);
/*
* If we're only collecting a single value then only provided *that*
* value to the sort so that swaps and loads are just a little faster
* in that *very* common case.
*/
BucketedSort.ExtraData values = metricValuesSources.size() == 1 ? metrics.values[0] : metrics;
BucketedSort.ExtraData values = metricSources.size() == 1 ? metrics.values[0] : metrics;
this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
}
@ -71,7 +78,12 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
if (size != 1) {
throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]");
}
return metrics.names.contains(name);
for (MetricValues values : metrics.values) {
if (values.name().equals(name)) {
return true;
}
}
return false;
}
@Override
@ -115,12 +127,12 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
public InternalAggregation buildAggregation(long bucket) throws IOException {
List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, metrics.resultBuilder(sort.getFormat()));
assert topMetrics.size() <= size;
return new InternalTopMetrics(name, sort.getOrder(), metrics.names, size, topMetrics, pipelineAggregators(), metaData());
return new InternalTopMetrics(name, sort.getOrder(), metrics.names(), size, topMetrics, pipelineAggregators(), metaData());
}
@Override
public InternalTopMetrics buildEmptyAggregation() {
return InternalTopMetrics.buildEmptyAggregation(name, metrics.names, pipelineAggregators(), metaData());
return InternalTopMetrics.buildEmptyAggregation(name, metrics.names(), pipelineAggregators(), metaData());
}
@Override
@ -128,23 +140,51 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
Releasables.close(sort, metrics);
}
private static class Metrics implements BucketedSort.ExtraData, Releasable {
private final List<String> names;
/**
* Information about each metric that this {@link Aggregator} uses to
* load and format metric values.
*/
static class MetricSource {
private final String name;
private final DocValueFormat format;
private final ValuesSource.Numeric valuesSource;
MetricSource(String name, DocValueFormat format, ValuesSource.Numeric valuesSource) {
this.name = name;
this.format = format;
this.valuesSource = valuesSource;
}
String getName() {
return name;
}
DocValueFormat getFormat() {
return format;
}
}
static class Metrics implements BucketedSort.ExtraData, Releasable {
private final MetricValues[] values;
Metrics(int size, QueryShardContext ctx, List<String> names, List<ValuesSource.Numeric> valuesSources) {
this.names = names;
values = new MetricValues[valuesSources.size()];
Metrics(int size, BigArrays bigArrays, List<MetricSource> sources) {
values = new MetricValues[sources.size()];
int i = 0;
for (ValuesSource.Numeric valuesSource : valuesSources) {
if (valuesSource == null) {
values[i++] = new MissingMetricValues();
continue;
}
values[i++] = new CollectMetricValues(size, ctx.bigArrays(), valuesSource);
for (MetricSource source : sources) {
values[i++] = valuesFor(size, bigArrays, source);
}
}
private static MetricValues valuesFor(int size, BigArrays bigArrays, MetricSource source) {
if (source.valuesSource == null) {
return new AlwaysNullMetricValues(source);
}
if (source.valuesSource.isFloatingPoint()) {
return new DoubleMetricValues(size, bigArrays, source);
}
return new LongMetricValues(size, bigArrays, source);
}
boolean needsScores() {
for (int i = 0; i < values.length; i++) {
if (values[i].needsScores()) {
@ -155,23 +195,28 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
}
double metric(String name, long index) {
int valueIndex = names.indexOf(name);
if (valueIndex < 0) {
throw new IllegalArgumentException("[" + name + "] not found");
for (MetricValues value : values) {
if (value.name().equals(name)) {
return value.doubleValue(index);
}
}
return values[valueIndex].value(index);
throw new IllegalArgumentException("[" + name + "] not found");
}
BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
return (index, sortValue) -> {
double[] result = new double[values.length];
List<InternalTopMetrics.MetricValue> result = new ArrayList<>(values.length);
for (int i = 0; i < values.length; i++) {
result[i] = values[i].value(index);
result.add(values[i].metricValue(index));
}
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, result);
};
}
List<String> names() {
return Arrays.stream(values).map(MetricValues::name).collect(toList());
}
@Override
public void swap(long lhs, long rhs) {
for (int i = 0; i < values.length; i++) {
@ -198,30 +243,60 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
}
}
private interface MetricValues extends BucketedSort.ExtraData, Releasable {
boolean needsScores();
double value(long index);
}
private static class CollectMetricValues implements MetricValues {
private final BigArrays bigArrays;
private final ValuesSource.Numeric metricValueSource;
private abstract static class MetricValues implements BucketedSort.ExtraData, Releasable {
protected final MetricSource source;
MetricValues(MetricSource source) {
this.source = source;
}
final String name() {
return source.name;
}
abstract boolean needsScores();
abstract double doubleValue(long index);
abstract InternalTopMetrics.MetricValue metricValue(long index);
}
private abstract static class CollectingMetricValues extends MetricValues {
protected final BigArrays bigArrays;
CollectingMetricValues(BigArrays bigArrays, MetricSource source) {
super(source);
this.bigArrays = bigArrays;
}
@Override
public final boolean needsScores() {
return source.valuesSource.needsScores();
}
}
/**
* Loads metrics for floating point numbers.
*/
static class DoubleMetricValues extends CollectingMetricValues {
private DoubleArray values;
CollectMetricValues(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
this.bigArrays = bigArrays;
this.metricValueSource = metricValueSource;
DoubleMetricValues(int size, BigArrays bigArrays, MetricSource source) {
super(bigArrays, source);
values = bigArrays.newDoubleArray(size, false);
}
@Override
public boolean needsScores() {
return metricValueSource.needsScores();
public double doubleValue(long index) {
return values.get(index);
}
@Override
public double value(long index) {
return values.get(index);
public MetricValue metricValue(long index) {
double value = values.get(index);
if (Double.isNaN(value)) {
// Use NaN as a sentinel for "missing"
return null;
}
return new MetricValue(source.format, SortValue.from(value));
}
@Override
@ -234,12 +309,13 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
// TODO allow configuration of value mode
NumericDoubleValues metricValues = MultiValueMode.AVG.select(metricValueSource.doubleValues(ctx));
NumericDoubleValues metricValues = MultiValueMode.AVG.select(source.valuesSource.doubleValues(ctx));
return (index, doc) -> {
if (index >= values.size()) {
values = bigArrays.grow(values, index + 1);
}
double metricValue = metricValues.advanceExact(doc) ? metricValues.doubleValue() : Double.NaN;
// Use NaN as a sentinel for "missing"
double metricValue = metricValues.advanceExact(doc) ? metricValues.doubleValue() : Double.NaN;
values.set(index, metricValue);
};
}
@ -249,12 +325,93 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
values.close();
}
}
private static class MissingMetricValues implements MetricValues {
/**
* Loads metrics for whole numbers.
*/
static class LongMetricValues extends CollectingMetricValues {
/**
* Tracks "missing" values in a {@link BitArray}. Unlike
* {@link DoubleMetricValues}, we there isn't a sentinel value
* that we can steel from the longs to represent missing that
* won't lead to more trouble than it is worth. So we track
* "missing" values explicitly.
*/
private final MissingHelper empty;
private LongArray values;
LongMetricValues(int size, BigArrays bigArrays, MetricSource source) {
super(bigArrays, source);
empty = new MissingHelper(bigArrays);
values = bigArrays.newLongArray(size, false);
}
@Override
public double value(long index) {
public double doubleValue(long index) {
if (empty.isEmpty(index)) {
return Double.NaN;
}
return values.get(index);
}
@Override
public MetricValue metricValue(long index) {
if (empty.isEmpty(index)) {
return null;
}
return new MetricValue(source.format, SortValue.from(values.get(index)));
}
@Override
public void swap(long lhs, long rhs) {
long tmp = values.get(lhs);
values.set(lhs, values.get(rhs));
values.set(rhs, tmp);
empty.swap(lhs, rhs);
}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
// TODO allow configuration of value mode
NumericDocValues metricValues = MultiValueMode.AVG.select(source.valuesSource.longValues(ctx));
return (index, doc) -> {
if (false == metricValues.advanceExact(doc)) {
empty.markMissing(index);
return;
}
if (index >= values.size()) {
values = bigArrays.grow(values, index + 1);
}
values.set(index, metricValues.longValue());
empty.markNotMissing(index);
};
}
@Override
public void close() {
Releasables.close(values, empty);
}
}
/**
* {@linkplain MetricValues} implementation for unmapped fields
* that always returns {@code null} or {@code NaN}.
*/
static class AlwaysNullMetricValues extends MetricValues {
AlwaysNullMetricValues(MetricSource source) {
super(source);
}
@Override
public double doubleValue(long index) {
return Double.NaN;
}
@Override
public MetricValue metricValue(long index) {
return null;
}
@Override
public boolean needsScores() {
return false;
@ -268,8 +425,76 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
return (index, doc) -> {};
}
@Override
public void close() {}
}
/**
* Helps {@link LongMetricValues} track "empty" slots. It attempts to have
* very low CPU overhead and no memory overhead when there *aren't* empty
* values.
*/
private static class MissingHelper implements Releasable {
private final BigArrays bigArrays;
private BitArray tracker;
MissingHelper(BigArrays bigArrays) {
this.bigArrays = bigArrays;
}
void markMissing(long index) {
int i = asInt(index);
if (tracker == null) {
tracker = new BitArray(i, bigArrays);
}
tracker.set(i);
}
void markNotMissing(long index) {
if (tracker == null) {
return;
}
tracker.clear(asInt(index));
}
void swap(long lhs, long rhs) {
if (tracker == null) {
return;
}
int l = asInt(lhs);
int r = asInt(rhs);
boolean backup = tracker.get(l);
if (tracker.get(r)) {
tracker.set(l);
} else {
tracker.clear(l);
}
if (backup) {
tracker.set(r);
} else {
tracker.clear(r);
}
}
boolean isEmpty(long index) {
if (tracker == null) {
return false;
}
return tracker.get(asInt(index));
}
@Override
public void close() {
if (tracker != null) {
tracker.close();
}
}
private int asInt(long index) {
if (index > Integer.MAX_VALUE) {
throw new IllegalArgumentException("top_metrics can't track more than " + Integer.MAX_VALUE + " values.");
}
return (int) index;
}
}
}

View File

@ -57,14 +57,14 @@ public class TopMetricsAggregatorFactory extends AggregatorFactory {
+ "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey()
+ "] index level setting.");
}
List<String> metricNames = metricFields.stream().map(MultiValuesSourceFieldConfig::getFieldName).collect(toList());
List<ValuesSource.Numeric> metricValuesSources = metricFields.stream().map(config -> {
List<TopMetricsAggregator.MetricSource> metricSources = metricFields.stream().map(config -> {
ValuesSourceConfig<ValuesSource.Numeric> resolved = ValuesSourceConfig.resolve(
searchContext.getQueryShardContext(), ValueType.NUMERIC,
config.getFieldName(), config.getScript(), config.getMissing(), config.getTimeZone(), null);
return resolved.toValuesSource(searchContext.getQueryShardContext());
return new TopMetricsAggregator.MetricSource(config.getFieldName(), resolved.format(),
resolved.toValuesSource(searchContext.getQueryShardContext()));
}).collect(toList());
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size,
sortBuilders.get(0), metricNames, metricValuesSources);
sortBuilders.get(0), metricSources);
}
}

View File

@ -95,7 +95,9 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
private InternalTopMetrics.TopMetric top(SortValue sortValue, double metricValue) {
DocValueFormat sortFormat = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN, DocValueFormat.IP);
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {metricValue});
DocValueFormat metricFormat = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN, DocValueFormat.IP);
InternalTopMetrics.MetricValue realMetricValue = new InternalTopMetrics.MetricValue(metricFormat, SortValue.from(metricValue));
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, singletonList(realMetricValue));
}
private InternalTopMetrics reduce(InternalTopMetrics... results) {

View File

@ -32,13 +32,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
public class InternalTopMetricsTests extends InternalAggregationTestCase<InternalTopMetrics> {
@ -49,7 +51,11 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
* would fail because the instances that it attempts to reduce don't
* have their results in the same order.
*/
private SortOrder sortOrder = randomFrom(SortOrder.values());
private final SortOrder sortOrder = randomFrom(SortOrder.values());
private final InternalTopMetrics.MetricValue metricOneDouble =
new InternalTopMetrics.MetricValue(DocValueFormat.RAW, SortValue.from(1.0));
private final InternalTopMetrics.MetricValue metricOneLong =
new InternalTopMetrics.MetricValue(DocValueFormat.RAW, SortValue.from(1));
public void testEmptyIsNotMapped() {
InternalTopMetrics empty = InternalTopMetrics.buildEmptyAggregation(
@ -63,9 +69,9 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
}
public void testToXContentDoubleSortValue() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
singletonList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0})),
emptyList(), null);
List<InternalTopMetrics.TopMetric> top = singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), singletonList(metricOneDouble)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -83,12 +89,11 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
"}"));
}
public void testToXConentDateSortValue() throws IOException {
DocValueFormat sortFormat = new DocValueFormat.DateTime(DateFormatter.forPattern("strict_date_time"), ZoneId.of("UTC"),
DateFieldMapper.Resolution.MILLISECONDS);
public void testToXContentDateSortValue() throws IOException {
SortValue sortValue = SortValue.from(ZonedDateTime.parse("2007-12-03T10:15:30Z").toInstant().toEpochMilli());
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
singletonList(new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {1.0})), emptyList(), null);
List<InternalTopMetrics.TopMetric> top = singletonList(new InternalTopMetrics.TopMetric(
strictDateTime(), sortValue, singletonList(metricOneDouble)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -106,10 +111,54 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
"}"));
}
public void testToXContentLongMetricValue() throws IOException {
List<InternalTopMetrics.TopMetric> top = singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), singletonList(metricOneLong)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
" \"top\" : [\n" +
" {\n" +
" \"sort\" : [\n" +
" 1.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"test\" : 1\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
"}"));
}
public void testToXContentDateMetricValue() throws IOException {
InternalTopMetrics.MetricValue metricValue = new InternalTopMetrics.MetricValue(
strictDateTime(), SortValue.from(ZonedDateTime.parse("2007-12-03T10:15:30Z").toInstant().toEpochMilli()));
List<InternalTopMetrics.TopMetric> top = singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), singletonList(metricValue)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
" \"top\" : [\n" +
" {\n" +
" \"sort\" : [\n" +
" 1.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"test\" : \"2007-12-03T10:15:30.000Z\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
"}"));
}
public void testToXContentManyMetrics() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, Arrays.asList("foo", "bar", "baz"), 1,
singletonList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0, 2.0, 3.0})),
emptyList(), null);
List<InternalTopMetrics.TopMetric> top = singletonList(new InternalTopMetrics.TopMetric(
DocValueFormat.RAW, SortValue.from(1.0), Arrays.asList(metricOneDouble, metricOneLong, metricOneDouble)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, Arrays.asList("foo", "bar", "baz"), 1, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -120,8 +169,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
" ],\n" +
" \"metrics\" : {\n" +
" \"foo\" : 1.0,\n" +
" \"bar\" : 2.0,\n" +
" \"baz\" : 3.0\n" +
" \"bar\" : 1,\n" +
" \"baz\" : 1.0\n" +
" }\n" +
" }\n" +
" ]\n" +
@ -130,11 +179,10 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
}
public void testToXContentManyTopMetrics() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 2,
Arrays.asList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0}),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), new double[] {2.0})),
emptyList(), null);
List<InternalTopMetrics.TopMetric> top = Arrays.asList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), singletonList(metricOneDouble)),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), singletonList(metricOneLong)));
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 2, top, emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -152,7 +200,7 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
" 2.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"test\" : 2.0\n" +
" \"test\" : 1\n" +
" }\n" +
" }\n" +
" ]\n" +
@ -171,10 +219,15 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
@Override
protected InternalTopMetrics createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
return createTestInstance(name, pipelineAggregators, metaData, InternalAggregationTestCase::randomNumericDocValueFormat);
}
private InternalTopMetrics createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, Supplier<DocValueFormat> randomDocValueFormat) {
int metricCount = between(1, 5);
List<String> metricNames = randomMetricNames(metricCount);
int size = between(1, 100);
List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size), metricCount);
List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(randomDocValueFormat, between(0, size), metricCount);
return new InternalTopMetrics(name, sortOrder, metricNames, size, topMetrics, pipelineAggregators, metaData);
}
@ -203,7 +256,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
case 4:
int fixedSize = size;
int fixedMetricsSize = metricNames.size();
topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize), fixedMetricsSize));
topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(
InternalAggregationTestCase::randomNumericDocValueFormat, between(1, fixedSize), fixedMetricsSize));
break;
default:
throw new IllegalArgumentException("bad mutation");
@ -217,6 +271,18 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
return InternalTopMetrics::new;
}
/**
* An extra test for parsing dates from xcontent because we can't random
* into {@link DocValueFormat.DateTime} because it doesn't
* implement {@link Object#equals(Object)}.
*/
public void testFromXContentDates() throws IOException {
InternalTopMetrics aggregation = createTestInstance(
randomAlphaOfLength(3), emptyList(), emptyMap(), InternalTopMetricsTests::strictDateTime);
ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), randomBoolean());
assertFromXContent(aggregation, parsedAggregation);
}
@Override
protected void assertFromXContent(InternalTopMetrics aggregation, ParsedAggregation parsedAggregation) throws IOException {
ParsedTopMetrics parsed = (ParsedTopMetrics) parsedAggregation;
@ -230,8 +296,14 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
assertThat(parsedTop.getSort(), equalTo(singletonList(expectedSort)));
assertThat(parsedTop.getMetrics().keySet(), hasSize(aggregation.getMetricNames().size()));
for (int m = 0; m < aggregation.getMetricNames().size(); m++) {
assertThat(parsedTop.getMetrics(),
hasEntry(aggregation.getMetricNames().get(m), internalTop.getMetricValues()[m]));
String name = aggregation.getMetricNames().get(m);
InternalTopMetrics.MetricValue value = internalTop.getMetricValues().get(m);
assertThat(parsedTop.getMetrics(), hasKey(name));
if (value.getFormat() == DocValueFormat.RAW) {
assertThat(parsedTop.getMetrics().get(name), equalTo(value.numberValue()));
} else {
assertThat(parsedTop.getMetrics().get(name), equalTo(value.getValue().format(value.getFormat())));
}
}
}
}
@ -251,10 +323,11 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
assertThat(reduced.getTopMetrics(), equalTo(winners));
}
private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length, int metricCount) {
private List<InternalTopMetrics.TopMetric> randomTopMetrics(
Supplier<DocValueFormat> randomDocValueFormat, int length, int metricCount) {
return IntStream.range(0, length)
.mapToObj(i -> new InternalTopMetrics.TopMetric(
randomNumericDocValueFormat(), randomSortValue(), randomMetricValues(metricCount)
randomDocValueFormat.get(), randomSortValue(), randomMetricValues(randomDocValueFormat, metricCount)
))
.sorted((lhs, rhs) -> sortOrder.reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()))
.collect(toList());
@ -268,10 +341,18 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
return new ArrayList<>(names);
}
private double[] randomMetricValues(int metricCount) {
return IntStream.range(0, metricCount).mapToDouble(i -> randomDouble()).toArray();
private List<InternalTopMetrics.MetricValue> randomMetricValues(Supplier<DocValueFormat> randomDocValueFormat, int metricCount) {
return IntStream.range(0, metricCount)
.mapToObj(i -> new InternalTopMetrics.MetricValue(randomDocValueFormat.get(), randomSortValue()))
.collect(toList());
}
private static DocValueFormat strictDateTime() {
return new DocValueFormat.DateTime(
DateFormatter.forPattern("strict_date_time"), ZoneId.of("UTC"), DateFieldMapper.Resolution.MILLISECONDS);
}
private static SortValue randomSortValue() {
if (randomBoolean()) {
return SortValue.from(randomLong());

View File

@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.topmetrics;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.TopMetric;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.MetricSource;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregator.Metrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notANumber;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TopMetricsAggregatorMetricsTests extends ESTestCase {
public void testUnmapped() throws IOException {
withMetric(null, (m, source) -> {
// Load from doc is a noop
m.loader(null).loadFromDoc(0, randomInt());
assertNullMetric(m, source, randomInt());
});
}
public void testEmptyLong() throws IOException {
SortedNumericDocValues values = mock(SortedNumericDocValues.class);
when(values.advanceExact(0)).thenReturn(false);
withMetric(valuesSource(values), (m, source) -> {
m.loader(null).loadFromDoc(0, 0);
assertNullMetric(m, source, 0);
});
}
public void testEmptyDouble() throws IOException {
SortedNumericDoubleValues values = mock(SortedNumericDoubleValues.class);
when(values.advanceExact(0)).thenReturn(false);
withMetric(valuesSource(values), (m, source) -> {
m.loader(null).loadFromDoc(0, 0);
assertNullMetric(m, source, 0);
});
}
public void testLoadLong() throws IOException {
long value = randomLong();
SortedNumericDocValues values = mock(SortedNumericDocValues.class);
when(values.advanceExact(0)).thenReturn(true);
when(values.docValueCount()).thenReturn(1);
when(values.nextValue()).thenReturn(value);
withMetric(valuesSource(values), (m, source) -> {
m.loader(null).loadFromDoc(0, 0);
assertMetricValue(m, 0, source, SortValue.from(value));
});
}
public void testLoadDouble() throws IOException {
double value = randomDouble();
SortedNumericDoubleValues values = mock(SortedNumericDoubleValues.class);
when(values.advanceExact(0)).thenReturn(true);
when(values.docValueCount()).thenReturn(1);
when(values.nextValue()).thenReturn(value);
withMetric(valuesSource(values), (m, source) -> {
m.loader(null).loadFromDoc(0, 0);
assertMetricValue(m, 0, source, SortValue.from(value));
});
}
public void testLoadAndSwapLong() throws IOException {
long firstValue = randomLong();
long secondValue = randomLong();
SortedNumericDocValues values = mock(SortedNumericDocValues.class);
when(values.advanceExact(0)).thenReturn(true);
when(values.advanceExact(1)).thenReturn(true);
when(values.docValueCount()).thenReturn(1);
when(values.nextValue()).thenReturn(firstValue, secondValue);
withMetric(valuesSource(values), (m, source) -> {
assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue));
});
}
public void testLoadAndSwapDouble() throws IOException {
double firstValue = randomDouble();
double secondValue = randomDouble();
SortedNumericDoubleValues values = mock(SortedNumericDoubleValues.class);
when(values.advanceExact(0)).thenReturn(true);
when(values.advanceExact(1)).thenReturn(true);
when(values.docValueCount()).thenReturn(1);
when(values.nextValue()).thenReturn(firstValue, secondValue);
withMetric(valuesSource(values), (m, source) -> {
assertLoadTwoAndSwap(m, source, SortValue.from(firstValue), SortValue.from(secondValue));
});
}
public void testManyValues() throws IOException {
long[] values = IntStream.range(0, between(2, 100)).mapToLong(i -> randomLong()).toArray();
List<ValuesSource.Numeric> valuesSources = Arrays.stream(values)
.mapToObj(v -> {
try {
SortedNumericDocValues docValues = mock(SortedNumericDocValues.class);
when(docValues.advanceExact(0)).thenReturn(true);
when(docValues.docValueCount()).thenReturn(1);
when(docValues.nextValue()).thenReturn(v);
return valuesSource(docValues);
} catch (IOException e) {
throw new AssertionError(e);
}
})
.collect(toList());
withMetrics(valuesSources, (m, sources) -> {
m.loader(null).loadFromDoc(0, 0);
TopMetric metric = m.resultBuilder(DocValueFormat.RAW).build(0, SortValue.from(1));
assertThat(metric.getMetricValues(), hasSize(values.length));
for (int i = 0; i < values.length; i++) {
MetricSource source = sources.get(i);
assertThat(m.metric(source.getName(), 0), equalTo((double) values[i]));
assertThat(metric.getMetricValues(),
hasItem(new MetricValue(source.getFormat(), SortValue.from(values[i]))));
}
});
}
private ValuesSource.Numeric valuesSource(SortedNumericDocValues values) throws IOException {
ValuesSource.Numeric source = mock(ValuesSource.Numeric.class);
when(source.isFloatingPoint()).thenReturn(false);
when(source.longValues(null)).thenReturn(values);
return source;
}
private ValuesSource.Numeric valuesSource(SortedNumericDoubleValues values) throws IOException {
ValuesSource.Numeric source = mock(ValuesSource.Numeric.class);
when(source.isFloatingPoint()).thenReturn(true);
when(source.doubleValues(null)).thenReturn(values);
return source;
}
private void withMetric(ValuesSource.Numeric valuesSource,
CheckedBiConsumer<Metrics, MetricSource, IOException> consumer) throws IOException {
withMetrics(singletonList(valuesSource), (m, sources) -> consumer.accept(m, sources.get(0)));
}
private void withMetrics(List<ValuesSource.Numeric> valuesSources,
CheckedBiConsumer<Metrics, List<MetricSource>, IOException> consumer) throws IOException {
Set<String> names = new HashSet<>();
List<MetricSource> sources = new ArrayList<>(valuesSources.size());
for (ValuesSource.Numeric valuesSource : valuesSources) {
String name = randomValueOtherThanMany(names::contains, () -> randomAlphaOfLength(5));
names.add(name);
sources.add(new MetricSource(name, randomDocValueFormat(), valuesSource));
}
try (Metrics m = new Metrics(1, BigArrays.NON_RECYCLING_INSTANCE, sources)) {
consumer.accept(m, sources);
}
}
private void assertNullMetric(Metrics m, MetricSource source, long index) {
DocValueFormat sortFormat = randomDocValueFormat();
assertThat(m.metric(source.getName(), index), notANumber());
TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1));
assertThat(metric.getSortFormat(), sameInstance(sortFormat));
assertThat(metric.getMetricValues(), equalTo(singletonList(null)));
}
private void assertMetricValue(Metrics m, long index, MetricSource source, SortValue value) {
DocValueFormat sortFormat = randomDocValueFormat();
assertThat(m.metric(source.getName(), index), equalTo(value.numberValue().doubleValue()));
TopMetric metric = m.resultBuilder(sortFormat).build(index, SortValue.from(1));
assertThat(metric.getSortValue(), equalTo(SortValue.from(1)));
assertThat(metric.getSortFormat(), sameInstance(sortFormat));
assertThat(metric.getMetricValues(), equalTo(singletonList(new MetricValue(source.getFormat(), value))));
}
private void assertLoadTwoAndSwap(Metrics m, MetricSource source, SortValue firstValue, SortValue secondValue) throws IOException {
m.loader(null).loadFromDoc(0, 0);
m.loader(null).loadFromDoc(1, 1);
assertMetricValue(m, 0, source, firstValue);
assertMetricValue(m, 1, source, secondValue);
m.swap(0, 1);
assertMetricValue(m, 0, source, secondValue);
assertMetricValue(m, 1, source, firstValue);
m.loader(null).loadFromDoc(2, 2); // 2 is empty
assertNullMetric(m, source, 2);
m.swap(0, 2);
assertNullMetric(m, source, 0);
assertMetricValue(m, 2, source, secondValue);
}
private DocValueFormat randomDocValueFormat() {
return randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN);
}
}

View File

@ -78,7 +78,6 @@ import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notANumber;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -97,10 +96,12 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
numberFieldType(NumberType.DOUBLE, "s"));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, Double.NaN))));
assertThat(result.getTopMetrics(), hasSize(1));
assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getTopMetrics().get(0).getMetricValues(), equalTo(singletonList(null)));
}
public void testMissingValueForMetric() throws IOException {
public void testMissingValueForDoubleMetric() throws IOException {
InternalTopMetrics result = collect(simpleBuilder(), new MatchAllDocsQuery(), writer -> {
writer.addDocument(singletonList(doubleField("s", 1.0)));
},
@ -108,11 +109,21 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), hasSize(1));
assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getTopMetrics().get(0).getMetricValues().length, equalTo(1));
assertThat(result.getTopMetrics().get(0).getMetricValues()[0], notANumber());
assertThat(result.getTopMetrics().get(0).getMetricValues(), equalTo(singletonList(null)));
}
public void testActualValueForMetric() throws IOException {
public void testMissingValueForLongMetric() throws IOException {
InternalTopMetrics result = collect(simpleBuilder(), new MatchAllDocsQuery(), writer -> {
writer.addDocument(singletonList(longField("s", 1)));
},
longFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), hasSize(1));
assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1)));
assertThat(result.getTopMetrics().get(0).getMetricValues(), equalTo(singletonList(null)));
}
public void testActualValueForDoubleMetric() throws IOException {
InternalTopMetrics result = collect(simpleBuilder(), new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(doubleField("s", 1.0), doubleField("m", 2.0)));
},
@ -121,6 +132,15 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 2.0))));
}
public void testActualValueForLongMetric() throws IOException {
InternalTopMetrics result = collect(simpleBuilder(), new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(longField("s", 1), longField("m", 2)));
},
longFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1, 2))));
}
private InternalTopMetrics collectFromDoubles(TopMetricsAggregationBuilder builder) throws IOException {
return collect(builder, new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(doubleField("s", 1.0), doubleField("m", 2.0)));
@ -132,8 +152,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
public void testSortByDoubleAscending() throws IOException {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {2.0}))));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, 2.0))));
}
public void testSortByDoubleDescending() throws IOException {
@ -387,13 +406,14 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
));
InternalTopMetrics result = collect(builder, new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(doubleField("s", 1.0),
doubleField("m1", 12.0), doubleField("m2", 22.0), doubleField("m3", 32.0)));
doubleField("m1", 12.0), longField("m2", 22), doubleField("m3", 32.0)));
writer.addDocument(Arrays.asList(doubleField("s", 2.0),
doubleField("m1", 13.0), doubleField("m2", 23.0), doubleField("m3", 33.0)));
doubleField("m1", 13.0), longField("m2", 23), doubleField("m3", 33.0)));
}, manyMetricsFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {12.0, 22.0, 32.0}))));
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), metricValues(
SortValue.from(12.0), SortValue.from(22), SortValue.from(32.0))))));
}
private TopMetricsAggregationBuilder simpleBuilder() {
@ -426,11 +446,15 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
return new MappedFieldType[] {numberFieldType(NumberType.DOUBLE, "s"), numberFieldType(NumberType.DOUBLE, "m")};
}
private MappedFieldType[] longFields() {
return new MappedFieldType[] {numberFieldType(NumberType.LONG, "s"), numberFieldType(NumberType.LONG, "m")};
}
private MappedFieldType[] manyMetricsFields() {
return new MappedFieldType[] {
numberFieldType(NumberType.DOUBLE, "s"),
numberFieldType(NumberType.DOUBLE, "m1"),
numberFieldType(NumberType.DOUBLE, "m2"),
numberFieldType(NumberType.LONG, "m2"),
numberFieldType(NumberType.DOUBLE, "m3"),
};
}
@ -516,11 +540,29 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
}
private InternalTopMetrics.TopMetric top(long sortValue, double... metricValues) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues(metricValues));
}
private InternalTopMetrics.TopMetric top(long sortValue, long... metricValues) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues(metricValues));
}
private InternalTopMetrics.TopMetric top(double sortValue, double... metricValues) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues(metricValues));
}
private List<InternalTopMetrics.MetricValue> metricValues(double... metricValues) {
return metricValues(Arrays.stream(metricValues).mapToObj(SortValue::from).toArray(SortValue[]::new));
}
private List<InternalTopMetrics.MetricValue> metricValues(long... metricValues) {
return metricValues(Arrays.stream(metricValues).mapToObj(SortValue::from).toArray(SortValue[]::new));
}
private List<InternalTopMetrics.MetricValue> metricValues(SortValue... metricValues) {
return Arrays.stream(metricValues)
.map(v -> new InternalTopMetrics.MetricValue(DocValueFormat.RAW, v))
.collect(toList());
}
/**