Add parsing for InternalScriptedMetric aggregation (#24738)
This commit is contained in:
parent
ce7326eb88
commit
9fc9db26fd
|
@ -412,7 +412,8 @@ public final class ObjectParser<Value, Context> extends AbstractObjectParser<Val
|
||||||
OBJECT_OR_STRING(START_OBJECT, VALUE_STRING),
|
OBJECT_OR_STRING(START_OBJECT, VALUE_STRING),
|
||||||
OBJECT_ARRAY_BOOLEAN_OR_STRING(START_OBJECT, START_ARRAY, VALUE_BOOLEAN, VALUE_STRING),
|
OBJECT_ARRAY_BOOLEAN_OR_STRING(START_OBJECT, START_ARRAY, VALUE_BOOLEAN, VALUE_STRING),
|
||||||
OBJECT_ARRAY_OR_STRING(START_OBJECT, START_ARRAY, VALUE_STRING),
|
OBJECT_ARRAY_OR_STRING(START_OBJECT, START_ARRAY, VALUE_STRING),
|
||||||
VALUE(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING);
|
VALUE(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING),
|
||||||
|
VALUE_OBJECT_ARRAY(VALUE_BOOLEAN, VALUE_NULL, VALUE_EMBEDDED_OBJECT, VALUE_NUMBER, VALUE_STRING, START_OBJECT, START_ARRAY);
|
||||||
|
|
||||||
private final EnumSet<XContentParser.Token> tokens;
|
private final EnumSet<XContentParser.Token> tokens;
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
return builder.field("value", aggregation());
|
return builder.field(CommonFields.VALUE.getPreferredName(), aggregation());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations.metrics.scripted;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
|
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||||
|
import org.elasticsearch.search.aggregations.ParsedAggregation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ParsedScriptedMetric extends ParsedAggregation implements ScriptedMetric {
|
||||||
|
private List<Object> aggregation;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return ScriptedMetricAggregationBuilder.NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object aggregation() {
|
||||||
|
assert aggregation.size() == 1; // see InternalScriptedMetric#aggregations() for why we can assume this
|
||||||
|
return aggregation.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
return builder.field(CommonFields.VALUE.getPreferredName(), aggregation());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final ObjectParser<ParsedScriptedMetric, Void> PARSER = new ObjectParser<>(ParsedScriptedMetric.class.getSimpleName(), true,
|
||||||
|
ParsedScriptedMetric::new);
|
||||||
|
|
||||||
|
static {
|
||||||
|
declareAggregationFields(PARSER);
|
||||||
|
PARSER.declareField((agg, value) -> agg.aggregation = Collections.singletonList(value),
|
||||||
|
ParsedScriptedMetric::parseValue, CommonFields.VALUE, ValueType.VALUE_OBJECT_ARRAY);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Object parseValue(XContentParser parser) throws IOException {
|
||||||
|
Token token = parser.currentToken();
|
||||||
|
Object value = null;
|
||||||
|
if (token == XContentParser.Token.VALUE_NULL) {
|
||||||
|
value = null;
|
||||||
|
} else if (token.isValue()) {
|
||||||
|
if (token == XContentParser.Token.VALUE_STRING) {
|
||||||
|
//binary values will be parsed back and returned as base64 strings when reading from json and yaml
|
||||||
|
value = parser.text();
|
||||||
|
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||||
|
value = parser.numberValue();
|
||||||
|
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
|
||||||
|
value = parser.booleanValue();
|
||||||
|
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
|
||||||
|
//binary values will be parsed back and returned as BytesArray when reading from cbor and smile
|
||||||
|
value = new BytesArray(parser.binaryValue());
|
||||||
|
}
|
||||||
|
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||||
|
value = parser.map();
|
||||||
|
} else if (token == XContentParser.Token.START_ARRAY) {
|
||||||
|
value = parser.list();
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ParsedScriptedMetric fromXContent(XContentParser parser, final String name) {
|
||||||
|
ParsedScriptedMetric aggregation = PARSER.apply(parser, null);
|
||||||
|
aggregation.setName(name);
|
||||||
|
return aggregation;
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.InternalHDR
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.InternalHDRPercentilesTests;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.hdr.InternalHDRPercentilesTests;
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesRanksTests;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesRanksTests;
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesTests;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentilesTests;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.scripted.InternalScriptedMetricTests;
|
||||||
import org.elasticsearch.search.aggregations.metrics.sum.InternalSumTests;
|
import org.elasticsearch.search.aggregations.metrics.sum.InternalSumTests;
|
||||||
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCountTests;
|
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCountTests;
|
||||||
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests;
|
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests;
|
||||||
|
@ -130,6 +131,7 @@ public class AggregationsTests extends ESTestCase {
|
||||||
aggsTests.add(new InternalAdjacencyMatrixTests());
|
aggsTests.add(new InternalAdjacencyMatrixTests());
|
||||||
aggsTests.add(new SignificantLongTermsTests());
|
aggsTests.add(new SignificantLongTermsTests());
|
||||||
aggsTests.add(new SignificantStringTermsTests());
|
aggsTests.add(new SignificantStringTermsTests());
|
||||||
|
aggsTests.add(new InternalScriptedMetricTests());
|
||||||
return Collections.unmodifiableList(aggsTests);
|
return Collections.unmodifiableList(aggsTests);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.search.aggregations.metrics.scripted;
|
package org.elasticsearch.search.aggregations.metrics.scripted;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.common.geo.GeoPoint;
|
||||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
@ -30,20 +31,46 @@ import org.elasticsearch.script.ScriptEngineRegistry;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.script.ScriptSettings;
|
import org.elasticsearch.script.ScriptSettings;
|
||||||
import org.elasticsearch.script.ScriptType;
|
import org.elasticsearch.script.ScriptType;
|
||||||
|
import org.elasticsearch.search.aggregations.ParsedAggregation;
|
||||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||||
import org.elasticsearch.test.InternalAggregationTestCase;
|
import org.elasticsearch.test.InternalAggregationTestCase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class InternalScriptedMetricTests extends InternalAggregationTestCase<InternalScriptedMetric> {
|
public class InternalScriptedMetricTests extends InternalAggregationTestCase<InternalScriptedMetric> {
|
||||||
|
|
||||||
private static final String REDUCE_SCRIPT_NAME = "reduceScript";
|
private static final String REDUCE_SCRIPT_NAME = "reduceScript";
|
||||||
// randomized only once so that any random test instance has the same value
|
private boolean hasReduceScript;
|
||||||
private boolean hasReduceScript = randomBoolean();
|
private Supplier<Object>[] valueTypes;
|
||||||
|
private final Supplier<Object>[] leafValueSuppliers = new Supplier[] { () -> randomInt(), () -> randomLong(), () -> randomDouble(),
|
||||||
|
() -> randomFloat(), () -> randomBoolean(), () -> randomAlphaOfLength(5), () -> new GeoPoint(randomDouble(), randomDouble()),
|
||||||
|
() -> null };
|
||||||
|
private final Supplier<Object>[] nestedValueSuppliers = new Supplier[] { () -> new HashMap<String, Object>(),
|
||||||
|
() -> new ArrayList<>() };
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
hasReduceScript = randomBoolean();
|
||||||
|
// we want the same value types (also for nested lists, maps) for all random aggregations
|
||||||
|
int levels = randomIntBetween(1, 3);
|
||||||
|
valueTypes = new Supplier[levels];
|
||||||
|
for (int i = 0; i < levels; i++) {
|
||||||
|
if (i < levels - 1) {
|
||||||
|
valueTypes[i] = randomFrom(nestedValueSuppliers);
|
||||||
|
} else {
|
||||||
|
// the last one needs to be a leaf value, not map or list
|
||||||
|
valueTypes[i] = randomFrom(leafValueSuppliers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected InternalScriptedMetric createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
|
protected InternalScriptedMetric createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
|
||||||
|
@ -56,7 +83,27 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
|
||||||
if (hasReduceScript) {
|
if (hasReduceScript) {
|
||||||
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, params);
|
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, params);
|
||||||
}
|
}
|
||||||
return new InternalScriptedMetric(name, randomAlphaOfLength(5), reduceScript, pipelineAggregators, metaData);
|
Object randomValue = randomValue(valueTypes, 0);
|
||||||
|
return new InternalScriptedMetric(name, randomValue, reduceScript, pipelineAggregators, metaData);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static Object randomValue(Supplier<Object>[] valueTypes, int level) {
|
||||||
|
Object value = valueTypes[level].get();
|
||||||
|
if (value instanceof Map) {
|
||||||
|
int elements = randomIntBetween(1, 5);
|
||||||
|
Map<String, Object> map = (Map<String, Object>) value;
|
||||||
|
for (int i = 0; i < elements; i++) {
|
||||||
|
map.put(randomAlphaOfLength(5), randomValue(valueTypes, level + 1));
|
||||||
|
}
|
||||||
|
} else if (value instanceof List) {
|
||||||
|
int elements = randomIntBetween(1,5);
|
||||||
|
List<Object> list = (List<Object>) value;
|
||||||
|
for (int i = 0; i < elements; i++) {
|
||||||
|
list.add(randomValue(valueTypes, level + 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -105,4 +152,52 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
|
||||||
return InternalScriptedMetric::new;
|
return InternalScriptedMetric::new;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertFromXContent(InternalScriptedMetric aggregation, ParsedAggregation parsedAggregation) {
|
||||||
|
assertTrue(parsedAggregation instanceof ParsedScriptedMetric);
|
||||||
|
ParsedScriptedMetric parsed = (ParsedScriptedMetric) parsedAggregation;
|
||||||
|
|
||||||
|
assertValues(aggregation.aggregation(), parsed.aggregation());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertValues(Object expected, Object actual) {
|
||||||
|
if (expected instanceof Long) {
|
||||||
|
// longs that fit into the integer range are parsed back as integer
|
||||||
|
if (actual instanceof Integer) {
|
||||||
|
assertEquals(((Long) expected).intValue(), actual);
|
||||||
|
} else {
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
} else if (expected instanceof Float) {
|
||||||
|
// based on the xContent type, floats are sometimes parsed back as doubles
|
||||||
|
if (actual instanceof Double) {
|
||||||
|
assertEquals(expected, ((Double) actual).floatValue());
|
||||||
|
} else {
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
} else if (expected instanceof GeoPoint) {
|
||||||
|
assertTrue(actual instanceof Map);
|
||||||
|
GeoPoint point = (GeoPoint) expected;
|
||||||
|
Map<String, Object> pointMap = (Map<String, Object>) actual;
|
||||||
|
assertEquals(point.getLat(), pointMap.get("lat"));
|
||||||
|
assertEquals(point.getLon(), pointMap.get("lon"));
|
||||||
|
} else if (expected instanceof Map) {
|
||||||
|
Map<String, Object> expectedMap = (Map<String, Object>) expected;
|
||||||
|
Map<String, Object> actualMap = (Map<String, Object>) actual;
|
||||||
|
assertEquals(expectedMap.size(), actualMap.size());
|
||||||
|
for (String key : expectedMap.keySet()) {
|
||||||
|
assertValues(expectedMap.get(key), actualMap.get(key));
|
||||||
|
}
|
||||||
|
} else if (expected instanceof List) {
|
||||||
|
List<Object> expectedList = (List<Object>) expected;
|
||||||
|
List<Object> actualList = (List<Object>) actual;
|
||||||
|
assertEquals(expectedList.size(), actualList.size());
|
||||||
|
Iterator<Object> actualIterator = actualList.iterator();
|
||||||
|
for (Object element : expectedList) {
|
||||||
|
assertValues(element, actualIterator.next());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,6 +96,8 @@ import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.Interna
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentiles;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.InternalTDigestPercentiles;
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentileRanks;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentileRanks;
|
||||||
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentiles;
|
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.ParsedTDigestPercentiles;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.scripted.ParsedScriptedMetric;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.scripted.ScriptedMetricAggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.metrics.stats.ParsedStats;
|
import org.elasticsearch.search.aggregations.metrics.stats.ParsedStats;
|
||||||
import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
|
import org.elasticsearch.search.aggregations.metrics.stats.StatsAggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsAggregationBuilder;
|
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStatsAggregationBuilder;
|
||||||
|
@ -182,6 +184,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
|
||||||
map.put(AdjacencyMatrixAggregationBuilder.NAME, (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c));
|
map.put(AdjacencyMatrixAggregationBuilder.NAME, (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c));
|
||||||
map.put(SignificantLongTerms.NAME, (p, c) -> ParsedSignificantLongTerms.fromXContent(p, (String) c));
|
map.put(SignificantLongTerms.NAME, (p, c) -> ParsedSignificantLongTerms.fromXContent(p, (String) c));
|
||||||
map.put(SignificantStringTerms.NAME, (p, c) -> ParsedSignificantStringTerms.fromXContent(p, (String) c));
|
map.put(SignificantStringTerms.NAME, (p, c) -> ParsedSignificantStringTerms.fromXContent(p, (String) c));
|
||||||
|
map.put(ScriptedMetricAggregationBuilder.NAME, (p, c) -> ParsedScriptedMetric.fromXContent(p, (String) c));
|
||||||
|
|
||||||
namedXContents = map.entrySet().stream()
|
namedXContents = map.entrySet().stream()
|
||||||
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
|
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
|
||||||
|
|
Loading…
Reference in New Issue