Support multiple metrics in `top_metrics` agg (backport of #52965) (#53163)

This adds support for returning multiple metrics to the `top_metrics`
agg. It looks like:
```
POST /test/_search?filter_path=aggregations
{
  "aggs": {
    "tm": {
      "top_metrics": {
        "metrics": [
          {"field": "v"},
          {"field": "m"}
        ],
        "sort": {"s": "desc"}
      }
    }
  }
}
```
This commit is contained in:
Nik Everett 2020-03-05 08:12:01 -05:00 committed by GitHub
parent 01504df876
commit 28df7ae5ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 479 additions and 201 deletions

View File

@ -32,6 +32,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
@ -49,20 +51,20 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
private final SortBuilder<?> sort;
private final int size;
private final String metric;
private final List<String> metrics;
/**
* Build the request.
* @param name the name of the metric
* @param sort the sort key used to select the top metrics
* @param size number of results to return per bucket
* @param metric the name of the field to select
* @param metrics the names of the fields to select
*/
public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, int size, String metric) {
public TopMetricsAggregationBuilder(String name, SortBuilder<?> sort, int size, String... metrics) {
super(name);
this.sort = sort;
this.size = size;
this.metric = metric;
this.metrics = Arrays.asList(metrics);
}
@Override
@ -78,7 +80,11 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
sort.toXContent(builder, params);
builder.endArray();
builder.field("size", size);
builder.startObject("metric").field("field", metric).endObject();
builder.startArray("metrics");
for (String metric: metrics) {
builder.startObject().field("field", metric).endObject();
}
builder.endArray();
}
return builder.endObject();
}

View File

@ -74,6 +74,20 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
assertThat(metric.getMetrics(), equalTo(singletonMap("v", 3.0)));
}
public void testTopMetricsManyMetrics() throws IOException {
indexTopMetricsData();
SearchRequest search = new SearchRequest("test");
search.source().aggregation(new TopMetricsAggregationBuilder(
"test", new FieldSortBuilder("s").order(SortOrder.DESC), 1, "v", "m"));
SearchResponse response = highLevelClient().search(search, RequestOptions.DEFAULT);
ParsedTopMetrics top = response.getAggregations().get("test");
assertThat(top.getTopMetrics(), hasSize(1));
ParsedTopMetrics.TopMetrics metric = top.getTopMetrics().get(0);
assertThat(metric.getSort(), equalTo(singletonList(2)));
assertThat(metric.getMetrics(), hasEntry("v", 3.0));
assertThat(metric.getMetrics(), hasEntry("m", 13.0));
}
public void testTopMetricsSizeTwo() throws IOException {
indexTopMetricsData();
SearchRequest search = new SearchRequest("test");
@ -92,8 +106,8 @@ public class AnalyticsAggsIT extends ESRestHighLevelClientTestCase {
private void indexTopMetricsData() throws IOException {
BulkRequest bulk = new BulkRequest("test").setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 1, "v", 2.0, "m", 12.0));
bulk.add(new IndexRequest().source(XContentType.JSON, "s", 2, "v", 3.0, "m", 13.0));
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);
}
}

View File

@ -22,7 +22,7 @@ POST /test/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"s": "desc"}
}
}
@ -68,11 +68,61 @@ request. So,
NOTE: This aggregation doesn't support any sort of "tie breaking". If two documents have
the same sort values then this aggregation could return either document's fields.
==== `metric`
==== `metrics`
`metrics` selects the fields to of the "top" document to return. Like most other
aggregations, `top_metrics` casts these values cast to `double` precision
floating point numbers. So they have to be numeric. Dates *work*, but they
come back as a `double` precision floating point containing milliseconds since
epoch. `keyword` fields aren't allowed.
You can return multiple metrics by providing a list:
[source,console,id=search-aggregations-metrics-top-metrics-list-of-metrics]
----
POST /test/_bulk?refresh
{"index": {}}
{"s": 1, "v": 3.1415, "m": 1.9}
{"index": {}}
{"s": 2, "v": 1.0, "m": 6.7}
{"index": {}}
{"s": 3, "v": 2.71828, "m": -12.2}
POST /test/_search?filter_path=aggregations
{
"aggs": {
"tm": {
"top_metrics": {
"metrics": [
{"field": "v"},
{"field": "m"}
],
"sort": {"s": "desc"}
}
}
}
}
----
Which returns:
[source,js]
----
{
"aggregations": {
"tm": {
"top": [ {
"sort": [3],
"metrics": {
"v": 2.718280076980591,
"m": -12.199999809265137
}
} ]
}
}
}
----
// TESTRESPONSE
At this point `metric` supports only `{"field": "field_name"}` and all metrics
are returned as double precision floating point numbers. Expect more to
come here.
==== `size`
@ -92,7 +142,7 @@ POST /test/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"s": "desc"},
"size": 2
}
@ -174,7 +224,7 @@ POST /node/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"date": "desc"}
}
}
@ -230,7 +280,7 @@ POST /node/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"date": "desc"}
}
}
@ -292,7 +342,7 @@ POST /test*/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"s": "asc"}
}
}
@ -325,7 +375,7 @@ POST /test*/_search?filter_path=aggregations
"aggs": {
"tm": {
"top_metrics": {
"metric": {"field": "v"},
"metrics": {"field": "v"},
"sort": {"s": {"order": "asc", "numeric_type": "double"}}
}
}

View File

@ -51,6 +51,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DateFieldMapper;
@ -180,8 +181,9 @@ public class MinAggregatorTests extends AggregatorTestCase {
@Override
protected QueryShardContext queryShardContextMock(IndexSearcher searcher, MapperService mapperService,
IndexSettings indexSettings, CircuitBreakerService circuitBreakerService) {
this.queryShardContext = super.queryShardContextMock(searcher, mapperService, indexSettings, circuitBreakerService);
IndexSettings indexSettings, CircuitBreakerService circuitBreakerService,
BigArrays bigArrays) {
this.queryShardContext = super.queryShardContextMock(searcher, mapperService, indexSettings, circuitBreakerService, bigArrays);
return queryShardContext;
}

View File

@ -421,11 +421,12 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
protected QueryShardContext queryShardContextMock(IndexSearcher searcher,
MapperService mapperService,
IndexSettings indexSettings,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays) {
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, SCRIPTS, Collections.emptyMap());
Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
ScriptService scriptService = new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
return new QueryShardContext(0, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, null, null, mapperService, null, scriptService,
return new QueryShardContext(0, indexSettings, bigArrays, null, null, mapperService, null, scriptService,
xContentRegistry(), writableRegistry(), null, null, System::currentTimeMillis, null, null, () -> true);
}
}

View File

@ -243,7 +243,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
when(searchContext.lookup()).thenReturn(searchLookup);
QueryShardContext queryShardContext =
queryShardContextMock(contextIndexSearcher, mapperService, indexSettings, circuitBreakerService);
queryShardContextMock(contextIndexSearcher, mapperService, indexSettings, circuitBreakerService, bigArrays);
when(searchContext.getQueryShardContext()).thenReturn(queryShardContext);
when(queryShardContext.getObjectMapper(anyString())).thenAnswer(invocation -> {
String fieldName = (String) invocation.getArguments()[0];
@ -293,9 +293,10 @@ public abstract class AggregatorTestCase extends ESTestCase {
protected QueryShardContext queryShardContextMock(IndexSearcher searcher,
MapperService mapperService,
IndexSettings indexSettings,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays) {
return new QueryShardContext(0, indexSettings, BigArrays.NON_RECYCLING_INSTANCE, null,
return new QueryShardContext(0, indexSettings, bigArrays, null,
getIndexFieldDataLookup(mapperService, circuitBreakerService),
mapperService, null, getMockScriptService(), xContentRegistry(),
writableRegistry(), null, searcher, System::currentTimeMillis, null, null, () -> true);

View File

@ -20,23 +20,27 @@ import org.elasticsearch.search.sort.SortValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyList;
import static org.elasticsearch.search.builder.SearchSourceBuilder.SORT_FIELD;
import static org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder.METRIC_FIELD;
public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiValue {
private final SortOrder sortOrder;
private final int size;
private final String metricName;
private final List<String> metricNames;
private final List<TopMetric> topMetrics;
public InternalTopMetrics(String name, @Nullable SortOrder sortOrder, String metricName,
public InternalTopMetrics(String name, @Nullable SortOrder sortOrder, List<String> metricNames,
int size, List<TopMetric> topMetrics, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sortOrder = sortOrder;
this.metricName = metricName;
this.metricNames = metricNames;
/*
* topMetrics.size won't be size when the bucket doesn't have size docs!
*/
@ -44,9 +48,9 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
this.topMetrics = topMetrics;
}
static InternalTopMetrics buildEmptyAggregation(String name, String metricField,
static InternalTopMetrics buildEmptyAggregation(String name, List<String> metricNames,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
return new InternalTopMetrics(name, SortOrder.ASC, metricField, 0, emptyList(), pipelineAggregators, metaData);
return new InternalTopMetrics(name, SortOrder.ASC, metricNames, 0, emptyList(), pipelineAggregators, metaData);
}
/**
@ -55,7 +59,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
public InternalTopMetrics(StreamInput in) throws IOException {
super(in);
sortOrder = SortOrder.readFromStream(in);
metricName = in.readString();
metricNames = in.readStringList();
size = in.readVInt();
topMetrics = in.readList(TopMetric::new);
}
@ -63,7 +67,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
sortOrder.writeTo(out);
out.writeString(metricName);
out.writeStringCollection(metricNames);
out.writeVInt(size);
out.writeList(topMetrics);
}
@ -78,15 +82,19 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
if (path.isEmpty()) {
return this;
}
if (path.size() == 1 && metricName.contentEquals(path.get(1))) {
if (topMetrics.isEmpty()) {
// Unmapped.
return null;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValue;
if (path.size() != 1) {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
int index = metricNames.indexOf(path.get(0));
if (index < 0) {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
if (topMetrics.isEmpty()) {
// Unmapped.
return null;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValues[index];
}
@Override
@ -116,7 +124,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
queue.updateTop();
}
}
return new InternalTopMetrics(getName(), sortOrder, metricName, size, merged, pipelineAggregators(), getMetaData());
return new InternalTopMetrics(getName(), sortOrder, metricNames, size, merged, pipelineAggregators(), getMetaData());
}
@Override
@ -128,7 +136,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray("top");
for (TopMetric top : topMetrics) {
top.toXContent(builder, metricName);
top.toXContent(builder, metricNames);
}
builder.endArray();
return builder;
@ -136,7 +144,7 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), sortOrder, metricName, size, topMetrics);
return Objects.hash(super.hashCode(), sortOrder, metricNames, size, topMetrics);
}
@Override
@ -144,21 +152,22 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
if (super.equals(obj) == false) return false;
InternalTopMetrics other = (InternalTopMetrics) obj;
return sortOrder.equals(other.sortOrder) &&
metricName.equals(other.metricName) &&
metricNames.equals(other.metricNames) &&
size == other.size &&
topMetrics.equals(other.topMetrics);
}
@Override
public double value(String name) {
if (metricName.equals(name)) {
if (topMetrics.isEmpty()) {
return Double.NaN;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValue;
int index = metricNames.indexOf(name);
if (index < 0) {
throw new IllegalArgumentException("unknown metric [" + name + "]");
}
throw new IllegalArgumentException("known metric [" + name + "]");
if (topMetrics.isEmpty()) {
return Double.NaN;
}
assert topMetrics.size() == 1 : "property paths should only resolve against top metrics with size == 1.";
return topMetrics.get(0).metricValues[index];
}
SortOrder getSortOrder() {
@ -169,8 +178,8 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
return size;
}
String getMetricName() {
return metricName;
List<String> getMetricNames() {
return metricNames;
}
List<TopMetric> getTopMetrics() {
@ -197,18 +206,25 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
static class TopMetric implements Writeable, Comparable<TopMetric> {
private final DocValueFormat sortFormat;
private final SortValue sortValue;
private final double metricValue;
private final double[] metricValues;
TopMetric(DocValueFormat sortFormat, SortValue sortValue, double metricValue) {
TopMetric(DocValueFormat sortFormat, SortValue sortValue, double[] metricValues) {
this.sortFormat = sortFormat;
this.sortValue = sortValue;
this.metricValue = metricValue;
this.metricValues = metricValues;
}
TopMetric(StreamInput in) throws IOException {
sortFormat = in.readNamedWriteable(DocValueFormat.class);
sortValue = in.readNamedWriteable(SortValue.class);
metricValue = in.readDouble();
metricValues = in.readDoubleArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(sortFormat);
out.writeNamedWriteable(sortValue);
out.writeDoubleArray(metricValues);
}
DocValueFormat getSortFormat() {
@ -219,26 +235,21 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
return sortValue;
}
double getMetricValue() {
return metricValue;
double[] getMetricValues() {
return metricValues;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(sortFormat);
out.writeNamedWriteable(sortValue);
out.writeDouble(metricValue);
}
public XContentBuilder toXContent(XContentBuilder builder, String metricName) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, List<String> metricNames) throws IOException {
builder.startObject();
{
builder.startArray("sort");
builder.startArray(SORT_FIELD.getPreferredName());
sortValue.toXContent(builder, sortFormat);
builder.endArray();
builder.startObject("metrics");
builder.startObject(METRIC_FIELD.getPreferredName());
{
builder.field(metricName, Double.isNaN(metricValue) ? null : metricValue);
for (int i = 0; i < metricValues.length; i++) {
builder.field(metricNames.get(i), Double.isNaN(metricValues[i]) ? null : metricValues[i]);
}
}
builder.endObject();
}
@ -258,17 +269,17 @@ public class InternalTopMetrics extends InternalNumericMetricsAggregation.MultiV
TopMetric other = (TopMetric) obj;
return sortFormat.equals(other.sortFormat)
&& sortValue.equals(other.sortValue)
&& metricValue == other.metricValue;
&& Arrays.equals(metricValues, other.metricValues);
}
@Override
public int hashCode() {
return Objects.hash(sortFormat, sortValue, metricValue);
return Objects.hash(sortFormat, sortValue, Arrays.hashCode(metricValues));
}
@Override
public String toString() {
return "TopMetric[" + sortFormat + "," + sortValue + "," + metricValue + "]";
return "TopMetric[" + sortFormat + "," + sortValue + "," + Arrays.toString(metricValues) + "]";
}
}
}

View File

@ -32,7 +32,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.SORT_FIELD;
public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<TopMetricsAggregationBuilder> {
public static final String NAME = "top_metrics";
public static final ParseField METRIC_FIELD = new ParseField("metric");
public static final ParseField METRIC_FIELD = new ParseField("metrics");
/**
* Default to returning only a single top metric.
@ -47,34 +47,35 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
if (size < 1) {
throw new IllegalArgumentException("[size] must be more than 0 but was [" + size + "]");
}
MultiValuesSourceFieldConfig metricField = (MultiValuesSourceFieldConfig) args[2];
return new TopMetricsAggregationBuilder(name, sorts, size, metricField);
@SuppressWarnings("unchecked")
List<MultiValuesSourceFieldConfig> metricFields = (List<MultiValuesSourceFieldConfig>) args[2];
return new TopMetricsAggregationBuilder(name, sorts, size, metricFields);
});
static {
PARSER.declareField(constructorArg(), (p, n) -> SortBuilder.fromXContent(p), SORT_FIELD,
ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING);
PARSER.declareInt(optionalConstructorArg(), SIZE_FIELD);
ContextParser<Void, MultiValuesSourceFieldConfig.Builder> metricParser = MultiValuesSourceFieldConfig.PARSER.apply(true, false);
PARSER.declareObject(constructorArg(), (p, n) -> metricParser.parse(p, null).build(), METRIC_FIELD);
PARSER.declareObjectArray(constructorArg(), (p, n) -> metricParser.parse(p, null).build(), METRIC_FIELD);
}
private final List<SortBuilder<?>> sortBuilders;
// TODO MultiValuesSourceFieldConfig has more things than we support and less things than we want to support
private final int size;
private final MultiValuesSourceFieldConfig metricField;
private final List<MultiValuesSourceFieldConfig> metricFields;
// TODO replace with ValuesSourceConfig once the value source refactor has landed
/**
* Build a {@code top_metrics} aggregation request.
*/
public TopMetricsAggregationBuilder(String name, List<SortBuilder<?>> sortBuilders, int size,
MultiValuesSourceFieldConfig metricField) {
List<MultiValuesSourceFieldConfig> metricFields) {
super(name);
if (sortBuilders.size() != 1) {
throw new IllegalArgumentException("[sort] must contain exactly one sort");
}
this.sortBuilders = sortBuilders;
this.size = size;
this.metricField = metricField;
this.metricFields = metricFields;
}
/**
@ -85,7 +86,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
super(clone, factoriesBuilder, metaData);
this.sortBuilders = clone.sortBuilders;
this.size = clone.size;
this.metricField = clone.metricField;
this.metricFields = clone.metricFields;
}
/**
@ -97,14 +98,14 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
List<SortBuilder<?>> sortBuilders = (List<SortBuilder<?>>) (List<?>) in.readNamedWriteableList(SortBuilder.class);
this.sortBuilders = sortBuilders;
this.size = in.readVInt();
this.metricField = new MultiValuesSourceFieldConfig(in);
this.metricFields = in.readList(MultiValuesSourceFieldConfig::new);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList(sortBuilders);
out.writeVInt(size);
metricField.writeTo(out);
out.writeList(metricFields);
}
@Override
@ -116,7 +117,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, Builder subFactoriesBuilder)
throws IOException {
return new TopMetricsAggregatorFactory(name, queryShardContext, parent, subFactoriesBuilder, metaData, sortBuilders,
size, metricField);
size, metricFields);
}
@Override
@ -129,7 +130,11 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
}
builder.endArray();
builder.field(SIZE_FIELD.getPreferredName(), size);
builder.field(METRIC_FIELD.getPreferredName(), metricField);
builder.startArray(METRIC_FIELD.getPreferredName());
for (MultiValuesSourceFieldConfig metricField: metricFields) {
metricField.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
return builder;
@ -148,7 +153,7 @@ public class TopMetricsAggregationBuilder extends AbstractAggregationBuilder<Top
return size;
}
MultiValuesSourceFieldConfig getMetricField() {
return metricField;
List<MultiValuesSourceFieldConfig> getMetricFields() {
return metricFields;
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
@ -46,25 +47,23 @@ import java.util.Map;
*/
class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
private final int size;
private final String metricName;
private final BucketedSort sort;
private final Values values;
private final ValuesSource.Numeric metricValueSource;
private final Metrics metrics;
TopMetricsAggregator(String name, SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, int size, String metricName,
SortBuilder<?> sort, ValuesSource.Numeric metricValueSource) throws IOException {
Map<String, Object> metaData, int size,
SortBuilder<?> sort, List<String> metricNames, List<ValuesSource.Numeric> metricValuesSources) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.size = size;
this.metricName = metricName;
this.metricValueSource = metricValueSource;
if (metricValueSource != null) {
values = new Values(size, context.bigArrays(), metricValueSource);
this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
} else {
values = null;
this.sort = null;
}
assert metricNames.size() == metricValuesSources.size();
metrics = new Metrics(size, context.getQueryShardContext(), metricNames, metricValuesSources);
/*
* If we're only collecting a single value then only provided *that*
* value to the sort so that swaps and loads are just a little faster
* in that *very* common case.
*/
BucketedSort.ExtraData values = metricValuesSources.size() == 1 ? metrics.values[0] : metrics;
this.sort = sort.buildBucketedSort(context.getQueryShardContext(), size, values);
}
@Override
@ -72,7 +71,7 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
if (size != 1) {
throw new IllegalArgumentException("[top_metrics] can only the be target if [size] is [1] but was [" + size + "]");
}
return metricName.equals(name);
return metrics.names.contains(name);
}
@Override
@ -84,12 +83,12 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
* be called after we've collected a bucket, so it won't just fetch
* garbage.
*/
return values.values.get(owningBucketOrd);
return metrics.metric(name, owningBucketOrd);
}
@Override
public ScoreMode scoreMode() {
boolean needs = (sort != null && sort.needsScores()) || (metricValueSource != null && metricValueSource.needsScores());
boolean needs = sort.needsScores() || metrics.needsScores();
return needs ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}
@ -97,9 +96,6 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
assert sub == LeafBucketCollector.NO_OP_COLLECTOR : "Expected noop but was " + sub.toString();
if (metricValueSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
BucketedSort.Leaf leafSort = sort.forLeaf(ctx);
return new LeafBucketCollector() {
@ -117,41 +113,115 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (metricValueSource == null) {
return buildEmptyAggregation();
}
List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, values.resultBuilder(sort.getFormat()));
List<InternalTopMetrics.TopMetric> topMetrics = sort.getValues(bucket, metrics.resultBuilder(sort.getFormat()));
assert topMetrics.size() <= size;
return new InternalTopMetrics(name, sort.getOrder(), metricName, size, topMetrics, pipelineAggregators(), metaData());
return new InternalTopMetrics(name, sort.getOrder(), metrics.names, size, topMetrics, pipelineAggregators(), metaData());
}
@Override
public InternalTopMetrics buildEmptyAggregation() {
// The sort format and sort order aren't used in reduction so we pass the simplest thing.
return InternalTopMetrics.buildEmptyAggregation(name, metricName, pipelineAggregators(),
metaData());
return InternalTopMetrics.buildEmptyAggregation(name, metrics.names, pipelineAggregators(), metaData());
}
@Override
public void doClose() {
Releasables.close(sort, values);
Releasables.close(sort, metrics);
}
private static class Values implements BucketedSort.ExtraData, Releasable {
private static class Metrics implements BucketedSort.ExtraData, Releasable {
private final List<String> names;
private final MetricValues[] values;
Metrics(int size, QueryShardContext ctx, List<String> names, List<ValuesSource.Numeric> valuesSources) {
this.names = names;
values = new MetricValues[valuesSources.size()];
int i = 0;
for (ValuesSource.Numeric valuesSource : valuesSources) {
if (valuesSource == null) {
values[i++] = new MissingMetricValues();
continue;
}
values[i++] = new CollectMetricValues(size, ctx.bigArrays(), valuesSource);
}
}
boolean needsScores() {
for (int i = 0; i < values.length; i++) {
if (values[i].needsScores()) {
return true;
}
}
return false;
}
double metric(String name, long index) {
int valueIndex = names.indexOf(name);
if (valueIndex < 0) {
throw new IllegalArgumentException("[" + name + "] not found");
}
return values[valueIndex].value(index);
}
BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
return (index, sortValue) -> {
double[] result = new double[values.length];
for (int i = 0; i < values.length; i++) {
result[i] = values[i].value(index);
}
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, result);
};
}
@Override
public void swap(long lhs, long rhs) {
for (int i = 0; i < values.length; i++) {
values[i].swap(lhs, rhs);
}
}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
Loader[] loaders = new Loader[values.length];
for (int i = 0; i < values.length; i++) {
loaders[i] = values[i].loader(ctx);
}
return (index, doc) -> {
for (int i = 0; i < loaders.length; i++) {
loaders[i].loadFromDoc(index, doc);
}
};
}
@Override
public void close() {
Releasables.close(values);
}
}
private interface MetricValues extends BucketedSort.ExtraData, Releasable {
boolean needsScores();
double value(long index);
}
private static class CollectMetricValues implements MetricValues {
private final BigArrays bigArrays;
private final ValuesSource.Numeric metricValueSource;
private DoubleArray values;
Values(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
CollectMetricValues(int size, BigArrays bigArrays, ValuesSource.Numeric metricValueSource) {
this.bigArrays = bigArrays;
this.metricValueSource = metricValueSource;
values = bigArrays.newDoubleArray(size, false);
}
BucketedSort.ResultBuilder<InternalTopMetrics.TopMetric> resultBuilder(DocValueFormat sortFormat) {
return (index, sortValue) ->
new InternalTopMetrics.TopMetric(sortFormat, sortValue, values.get(index));
@Override
public boolean needsScores() {
return metricValueSource.needsScores();
}
@Override
public double value(long index) {
return values.get(index);
}
@Override
@ -179,4 +249,27 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
values.close();
}
}
private static class MissingMetricValues implements MetricValues {
@Override
public double value(long index) {
return Double.NaN;
}
@Override
public boolean needsScores() {
return false;
}
@Override
public void swap(long lhs, long rhs) {}
@Override
public Loader loader(LeafReaderContext ctx) throws IOException {
return (index, doc) -> {};
}
@Override
public void close() {
}
}
}

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.toList;
public class TopMetricsAggregatorFactory extends AggregatorFactory {
/**
* Index setting describing the maximum number of top metrics that
@ -35,40 +37,34 @@ public class TopMetricsAggregatorFactory extends AggregatorFactory {
private final List<SortBuilder<?>> sortBuilders;
private final int size;
private final MultiValuesSourceFieldConfig metricField;
private final List<MultiValuesSourceFieldConfig> metricFields;
public TopMetricsAggregatorFactory(String name, QueryShardContext queryShardContext, AggregatorFactory parent,
Builder subFactoriesBuilder, Map<String, Object> metaData, List<SortBuilder<?>> sortBuilders,
int size, MultiValuesSourceFieldConfig metricField) throws IOException {
int size, List<MultiValuesSourceFieldConfig> metricFields) throws IOException {
super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
this.sortBuilders = sortBuilders;
this.size = size;
this.metricField = metricField;
this.metricFields = metricFields;
}
@Override
protected TopMetricsAggregator createInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
ValuesSourceConfig<ValuesSource.Numeric> metricFieldSource = ValuesSourceConfig.resolve(queryShardContext, ValueType.NUMERIC,
metricField.getFieldName(), metricField.getScript(), metricField.getMissing(), metricField.getTimeZone(), null);
ValuesSource.Numeric metricValueSource = metricFieldSource.toValuesSource(queryShardContext);
int maxBucketSize = MAX_BUCKET_SIZE.get(searchContext.getQueryShardContext().getIndexSettings().getSettings());
if (size > maxBucketSize) {
throw new IllegalArgumentException("[top_metrics.size] must not be more than [" + maxBucketSize + "] but was [" + size
+ "]. This limit can be set by changing the [" + MAX_BUCKET_SIZE.getKey()
+ "] index level setting.");
}
if (metricValueSource == null) {
return createUnmapped(searchContext, parent, pipelineAggregators, metaData);
}
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
sortBuilders.get(0), metricValueSource);
List<String> metricNames = metricFields.stream().map(MultiValuesSourceFieldConfig::getFieldName).collect(toList());
List<ValuesSource.Numeric> metricValuesSources = metricFields.stream().map(config -> {
ValuesSourceConfig<ValuesSource.Numeric> resolved = ValuesSourceConfig.resolve(
searchContext.getQueryShardContext(), ValueType.NUMERIC,
config.getFieldName(), config.getScript(), config.getMissing(), config.getTimeZone(), null);
return resolved.toValuesSource(searchContext.getQueryShardContext());
}).collect(toList());
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size,
sortBuilders.get(0), metricNames, metricValuesSources);
}
private TopMetricsAggregator createUnmapped(SearchContext searchContext, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new TopMetricsAggregator(name, searchContext, parent, pipelineAggregators, metaData, size, metricField.getFieldName(),
null, null);
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
@ -45,7 +46,7 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
InternalTopMetrics winner = first.getSortOrder() == SortOrder.ASC ? min : max;
InternalTopMetrics reduced = reduce(metrics);
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getSize(), equalTo(first.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
@ -60,7 +61,7 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
};
InternalTopMetrics reduced = reduce(metrics);
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getSize(), equalTo(first.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(Arrays.asList(
@ -74,14 +75,14 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
// Doubles sort first.
InternalTopMetrics winner = doubleMetrics.getSortOrder() == SortOrder.ASC ? doubleMetrics : longMetrics;
assertThat(reduced.getName(), equalTo("test"));
assertThat(reduced.getMetricName(), equalTo("test"));
assertThat(reduced.getMetricNames(), equalTo(singletonList("test")));
assertThat(reduced.getSortOrder(), equalTo(doubleMetrics.getSortOrder()));
assertThat(reduced.getSize(), equalTo(doubleMetrics.getSize()));
assertThat(reduced.getTopMetrics(), equalTo(winner.getTopMetrics()));
}
private InternalTopMetrics buildEmpty() {
return InternalTopMetrics.buildEmptyAggregation("test", "test", emptyList(), null);
return InternalTopMetrics.buildEmptyAggregation("test", singletonList("test"), emptyList(), null);
}
private InternalTopMetrics buildFilled(int size, InternalTopMetrics.TopMetric... metrics) {
@ -89,12 +90,12 @@ public class InternalTopMetricsReduceTests extends ESTestCase {
}
private InternalTopMetrics buildFilled(SortOrder sortOrder, int size, InternalTopMetrics.TopMetric... metrics) {
return new InternalTopMetrics("test", sortOrder, "test", size, Arrays.asList(metrics), emptyList(), null);
return new InternalTopMetrics("test", sortOrder, singletonList("test"), size, Arrays.asList(metrics), emptyList(), null);
}
private InternalTopMetrics.TopMetric top(SortValue sortValue, double metricValue) {
DocValueFormat sortFormat = randomFrom(DocValueFormat.RAW, DocValueFormat.BINARY, DocValueFormat.BOOLEAN, DocValueFormat.IP);
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, metricValue);
return new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {metricValue});
}
private InternalTopMetrics reduce(InternalTopMetrics... results) {

View File

@ -27,16 +27,18 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
public class InternalTopMetricsTests extends InternalAggregationTestCase<InternalTopMetrics> {
@ -51,7 +53,7 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
public void testEmptyIsNotMapped() {
InternalTopMetrics empty = InternalTopMetrics.buildEmptyAggregation(
randomAlphaOfLength(5), randomAlphaOfLength(2), emptyList(), null);
randomAlphaOfLength(5), randomMetricNames(between(1, 5)), emptyList(), null);
assertFalse(empty.isMapped());
}
@ -61,8 +63,9 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
}
public void testToXContentDoubleSortValue() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
Arrays.asList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0)), emptyList(), null);
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
singletonList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0})),
emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -84,8 +87,8 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
DocValueFormat sortFormat = new DocValueFormat.DateTime(DateFormatter.forPattern("strict_date_time"), ZoneId.of("UTC"),
DateFieldMapper.Resolution.MILLISECONDS);
SortValue sortValue = SortValue.from(ZonedDateTime.parse("2007-12-03T10:15:30Z").toInstant().toEpochMilli());
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 1,
Arrays.asList(new InternalTopMetrics.TopMetric(sortFormat, sortValue, 1.0)), emptyList(), null);
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 1,
singletonList(new InternalTopMetrics.TopMetric(sortFormat, sortValue, new double[] {1.0})), emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
@ -103,11 +106,34 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
"}"));
}
public void testToXContentManyValues() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, "test", 2,
public void testToXContentManyMetrics() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, Arrays.asList("foo", "bar", "baz"), 1,
singletonList(new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0, 2.0, 3.0})),
emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
" \"test\" : {\n" +
" \"top\" : [\n" +
" {\n" +
" \"sort\" : [\n" +
" 1.0\n" +
" ],\n" +
" \"metrics\" : {\n" +
" \"foo\" : 1.0,\n" +
" \"bar\" : 2.0,\n" +
" \"baz\" : 3.0\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
"}"));
}
public void testToXContentManyTopMetrics() throws IOException {
InternalTopMetrics tm = new InternalTopMetrics("test", sortOrder, singletonList("test"), 2,
Arrays.asList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 1.0),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), 2.0)),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {1.0}),
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(2.0), new double[] {2.0})),
emptyList(), null);
assertThat(Strings.toString(tm, true, true), equalTo(
"{\n" +
@ -145,17 +171,18 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
@Override
protected InternalTopMetrics createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
String metricName = randomAlphaOfLength(5);
int metricCount = between(1, 5);
List<String> metricNames = randomMetricNames(metricCount);
int size = between(1, 100);
List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size));
return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics, pipelineAggregators, metaData);
List<InternalTopMetrics.TopMetric> topMetrics = randomTopMetrics(between(0, size), metricCount);
return new InternalTopMetrics(name, sortOrder, metricNames, size, topMetrics, pipelineAggregators, metaData);
}
@Override
protected InternalTopMetrics mutateInstance(InternalTopMetrics instance) throws IOException {
String name = instance.getName();
SortOrder sortOrder = instance.getSortOrder();
String metricName = instance.getMetricName();
List<String> metricNames = instance.getMetricNames();
int size = instance.getSize();
List<InternalTopMetrics.TopMetric> topMetrics = instance.getTopMetrics();
switch (randomInt(4)) {
@ -167,19 +194,21 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
Collections.reverse(topMetrics);
break;
case 2:
metricName = randomAlphaOfLength(6);
metricNames = new ArrayList<>(metricNames);
metricNames.set(randomInt(metricNames.size() - 1), randomAlphaOfLength(6));
break;
case 3:
size = randomValueOtherThan(size, () -> between(1, 100));
break;
case 4:
int fixedSize = size;
topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize)));
int fixedMetricsSize = metricNames.size();
topMetrics = randomValueOtherThan(topMetrics, () -> randomTopMetrics(between(1, fixedSize), fixedMetricsSize));
break;
default:
throw new IllegalArgumentException("bad mutation");
}
return new InternalTopMetrics(name, sortOrder, metricName, size, topMetrics,
return new InternalTopMetrics(name, sortOrder, metricNames, size, topMetrics,
instance.pipelineAggregators(), instance.getMetaData());
}
@ -199,7 +228,11 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
Object expectedSort = internalTop.getSortFormat() == DocValueFormat.RAW ?
internalTop.getSortValue().getKey() : internalTop.getSortValue().format(internalTop.getSortFormat());
assertThat(parsedTop.getSort(), equalTo(singletonList(expectedSort)));
assertThat(parsedTop.getMetrics(), equalTo(singletonMap(aggregation.getMetricName(), internalTop.getMetricValue())));
assertThat(parsedTop.getMetrics().keySet(), hasSize(aggregation.getMetricNames().size()));
for (int m = 0; m < aggregation.getMetricNames().size(); m++) {
assertThat(parsedTop.getMetrics(),
hasEntry(aggregation.getMetricNames().get(m), internalTop.getMetricValues()[m]));
}
}
}
@ -214,17 +247,31 @@ public class InternalTopMetricsTests extends InternalAggregationTestCase<Interna
List<InternalTopMetrics.TopMetric> winners = metrics.size() > first.getSize() ? metrics.subList(0, first.getSize()) : metrics;
assertThat(reduced.getName(), equalTo(first.getName()));
assertThat(reduced.getSortOrder(), equalTo(first.getSortOrder()));
assertThat(reduced.getMetricName(), equalTo(first.getMetricName()));
assertThat(reduced.getMetricNames(), equalTo(first.getMetricNames()));
assertThat(reduced.getTopMetrics(), equalTo(winners));
}
private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length) {
private List<InternalTopMetrics.TopMetric> randomTopMetrics(int length, int metricCount) {
return IntStream.range(0, length)
.mapToObj(i -> new InternalTopMetrics.TopMetric(randomNumericDocValueFormat(), randomSortValue(), randomDouble()))
.mapToObj(i -> new InternalTopMetrics.TopMetric(
randomNumericDocValueFormat(), randomSortValue(), randomMetricValues(metricCount)
))
.sorted((lhs, rhs) -> sortOrder.reverseMul() * lhs.getSortValue().compareTo(rhs.getSortValue()))
.collect(toList());
}
static List<String> randomMetricNames(int metricCount) {
Set<String> names = new HashSet<>(metricCount);
while (names.size() < metricCount) {
names.add(randomAlphaOfLength(5));
}
return new ArrayList<>(names);
}
private double[] randomMetricValues(int metricCount) {
return IntStream.range(0, metricCount).mapToDouble(i -> randomDouble()).toArray();
}
private static SortValue randomSortValue() {
if (randomBoolean()) {
return SortValue.from(randomLong());

View File

@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.List;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@ -68,9 +69,14 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
protected TopMetricsAggregationBuilder createTestInstance() {
List<SortBuilder<?>> sortBuilders = singletonList(
new FieldSortBuilder(randomAlphaOfLength(5)).order(randomFrom(SortOrder.values())));
MultiValuesSourceFieldConfig.Builder metricField = new MultiValuesSourceFieldConfig.Builder();
metricField.setFieldName(randomAlphaOfLength(5)).setMissing(1.0);
return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, between(1, 100), metricField.build());
List<MultiValuesSourceFieldConfig> metricFields = InternalTopMetricsTests.randomMetricNames(between(1, 5)).stream()
.map(name -> {
MultiValuesSourceFieldConfig.Builder metricField = new MultiValuesSourceFieldConfig.Builder();
metricField.setFieldName(randomAlphaOfLength(5)).setMissing(1.0);
return metricField.build();
})
.collect(toList());
return new TopMetricsAggregationBuilder(randomAlphaOfLength(5), sortBuilders, between(1, 100), metricFields);
}
public void testClientBuilder() throws IOException {
@ -98,6 +104,6 @@ public class TopMetricsAggregationBuilderTests extends AbstractSerializingTestCa
serverBuilder.getName(),
serverBuilder.getSortBuilders().get(0),
serverBuilder.getSize(),
serverBuilder.getMetricField().getFieldName());
serverBuilder.getMetricFields().stream().map(MultiValuesSourceFieldConfig::getFieldName).toArray(String[]::new));
}
}

View File

@ -68,12 +68,14 @@ import org.elasticsearch.search.sort.SortValue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notANumber;
@ -95,7 +97,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
},
numberFieldType(NumberType.DOUBLE, "s"));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(emptyList()));
assertThat(result.getTopMetrics(), equalTo(singletonList(top(1.0, Double.NaN))));
}
public void testMissingValueForMetric() throws IOException {
@ -106,7 +108,8 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), hasSize(1));
assertThat(result.getTopMetrics().get(0).getSortValue(), equalTo(SortValue.from(1.0)));
assertThat(result.getTopMetrics().get(0).getMetricValue(), notANumber());
assertThat(result.getTopMetrics().get(0).getMetricValues().length, equalTo(1));
assertThat(result.getTopMetrics().get(0).getMetricValues()[0], notANumber());
}
public void testActualValueForMetric() throws IOException {
@ -130,7 +133,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
InternalTopMetrics result = collectFromDoubles(simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)));
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), 2.0))));
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {2.0}))));
}
public void testSortByDoubleDescending() throws IOException {
@ -349,21 +352,50 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
* breaker. The number of buckets feels fairly arbitrary but
* it comes from:
* budget = 15k = 20k - 5k for the "default weight" of ever agg
* The 922th bucket causes a resize which requests puts the total
* just over 15k.O
* The 646th bucket causes a resize which requests puts the total
* just over 15k. This works out to more like 190 bits per bucket
* when we're fairly sure this should take about 129 bits per
* bucket. The difference is because, for arrays in of this size,
* BigArrays allocates the new array before freeing the old one.
* That causes us to trip when we're about 2/3 of the way to the
* limit. And 2/3 of 190 is 126. Which is pretty much what we
* expect. Sort of.
*/
int bucketThatBreaks = 922;
int bucketThatBreaks = 646;
for (int b = 0; b < bucketThatBreaks; b++) {
leaf.collect(0, b);
try {
leaf.collect(0, b);
} catch (CircuitBreakingException e) {
throw new AssertionError("Unexpected circuit break at [" + b + "]. Expected at [" + bucketThatBreaks + "]", e);
}
}
CircuitBreakingException e = expectThrows(CircuitBreakingException.class, () -> leaf.collect(0, bucketThatBreaks));
assertThat(e.getMessage(), equalTo("test error"));
assertThat(e.getByteLimit(), equalTo(max.getBytes()));
assertThat(e.getBytesWanted(), equalTo(16440L));
assertThat(e.getBytesWanted(), equalTo(5872L));
}
}
}
public void testManyMetrics() throws IOException {
List<SortBuilder<?>> sorts = singletonList(new FieldSortBuilder("s").order(SortOrder.ASC));
TopMetricsAggregationBuilder builder = new TopMetricsAggregationBuilder("test", sorts, 1,
Arrays.asList(
new MultiValuesSourceFieldConfig.Builder().setFieldName("m1").build(),
new MultiValuesSourceFieldConfig.Builder().setFieldName("m2").build(),
new MultiValuesSourceFieldConfig.Builder().setFieldName("m3").build()
));
InternalTopMetrics result = collect(builder, new MatchAllDocsQuery(), writer -> {
writer.addDocument(Arrays.asList(doubleField("s", 1.0),
doubleField("m1", 12.0), doubleField("m2", 22.0), doubleField("m3", 32.0)));
writer.addDocument(Arrays.asList(doubleField("s", 2.0),
doubleField("m1", 13.0), doubleField("m2", 23.0), doubleField("m3", 33.0)));
}, manyMetricsFields());
assertThat(result.getSortOrder(), equalTo(SortOrder.ASC));
assertThat(result.getTopMetrics(), equalTo(singletonList(
new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(1.0), new double[] {12.0, 22.0, 32.0}))));
}
private TopMetricsAggregationBuilder simpleBuilder() {
return simpleBuilder(new FieldSortBuilder("s"));
}
@ -374,7 +406,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
private TopMetricsAggregationBuilder simpleBuilder(SortBuilder<?> sort, int size) {
return new TopMetricsAggregationBuilder("test", singletonList(sort), size,
new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build());
singletonList(new MultiValuesSourceFieldConfig.Builder().setFieldName("m").build()));
}
/**
@ -394,6 +426,16 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
return new MappedFieldType[] {numberFieldType(NumberType.DOUBLE, "s"), numberFieldType(NumberType.DOUBLE, "m")};
}
private MappedFieldType[] manyMetricsFields() {
return new MappedFieldType[] {
numberFieldType(NumberType.DOUBLE, "s"),
numberFieldType(NumberType.DOUBLE, "m1"),
numberFieldType(NumberType.DOUBLE, "m2"),
numberFieldType(NumberType.DOUBLE, "m3"),
};
}
private MappedFieldType[] floatAndDoubleField() {
return new MappedFieldType[] {numberFieldType(NumberType.FLOAT, "s"), numberFieldType(NumberType.DOUBLE, "m")};
}
@ -452,7 +494,10 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
private InternalTopMetrics collect(TopMetricsAggregationBuilder builder, Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex, MappedFieldType... fields) throws IOException {
InternalTopMetrics result = (InternalTopMetrics) collect((AggregationBuilder) builder, query, buildIndex, fields);
assertThat(result.getMetricName(), equalTo(builder.getMetricField().getFieldName()));
List<String> expectedFieldNames = builder.getMetricFields().stream()
.map(MultiValuesSourceFieldConfig::getFieldName)
.collect(toList());
assertThat(result.getMetricNames(), equalTo(expectedFieldNames));
return result;
}
@ -470,12 +515,12 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
}
}
private InternalTopMetrics.TopMetric top(long sortValue, double metricValue) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
private InternalTopMetrics.TopMetric top(long sortValue, double... metricValues) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
}
private InternalTopMetrics.TopMetric top(double sortValue, double metricValue) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValue);
private InternalTopMetrics.TopMetric top(double sortValue, double... metricValues) {
return new InternalTopMetrics.TopMetric(DocValueFormat.RAW, SortValue.from(sortValue), metricValues);
}
/**

View File

@ -23,7 +23,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: desc
@ -36,7 +36,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: asc
@ -49,7 +49,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s:
@ -92,7 +92,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: desc
@ -105,7 +105,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: asc
@ -146,7 +146,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: desc
@ -159,7 +159,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
s: asc
@ -196,7 +196,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort: s.keyword
- match: { error.root_cause.0.reason: "error building sort for field [s.keyword] of type [keyword] in index [test]: only supported on numeric fields" }
@ -236,7 +236,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort: _score
- match: { aggregations.tm.top.0.metrics.v: 3.1414999961853027 }
@ -263,7 +263,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
_script:
@ -303,7 +303,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
_script:
@ -345,7 +345,7 @@
aggs:
pop:
top_metrics:
metric:
metrics:
field: population
sort:
_geo_distance:
@ -361,7 +361,7 @@
pop:
top_metrics:
size: 3
metric:
metrics:
field: population
sort:
_geo_distance:
@ -412,7 +412,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
date: desc
@ -437,7 +437,7 @@
aggs:
tm:
top_metrics:
metric:
metrics:
field: v
sort:
date: desc
@ -483,7 +483,7 @@
tm:
top_metrics:
size: 100
metric:
metrics:
field: v
sort:
s: desc
@ -503,7 +503,7 @@
tm:
top_metrics:
size: 100
metric:
metrics:
field: v
sort:
s: desc