[Transform] add support for percentile aggs (#51808)

make transform ready for multi value aggregations and add support for percentile

fixes #51663
This commit is contained in:
Hendrik Muhs 2020-02-04 12:01:30 +01:00
parent 5d5f3ce256
commit c2b08bb72f
7 changed files with 223 additions and 51 deletions

View File

@ -1109,6 +1109,70 @@ public class TransformPivotRestIT extends TransformRestTestCase {
deleteIndex(indexName);
}
public void testPivotWithPercentile() throws Exception {
String transformId = "percentile_pivot";
String transformIndex = "percentile_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"p\": {"
+ " \"percentiles\" : {"
+ " \"field\": \"stars\", "
+ " \"percents\": [5, 50, 90, 99.9]"
+ " }"
+ " } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(5, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(4, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());
}
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {

View File

@ -20,11 +20,14 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.transform.transforms.IDGenerator;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
import java.util.Arrays;
import java.util.Collection;
@ -46,6 +49,7 @@ public final class AggregationResultUtils {
tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor());
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@ -59,12 +63,14 @@ public final class AggregationResultUtils {
* @param stats stats collector
* @return a map containing the results of the aggregation in a consumable way
*/
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggs,
Map<String, String> fieldTypeMap,
TransformIndexerStats stats) {
public static Stream<Map<String, Object>> extractCompositeAggregationResults(
CompositeAggregation agg,
GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggs,
Map<String, String> fieldTypeMap,
TransformIndexerStats stats
) {
return agg.getBuckets().stream().map(bucket -> {
stats.incrementNumDocuments(bucket.getDocCount());
Map<String, Object> document = new HashMap<>();
@ -82,7 +88,7 @@ public final class AggregationResultUtils {
List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());
aggNames.addAll(pipelineAggs.stream().map(PipelineAggregationBuilder::getName).collect(Collectors.toList()));
for (String aggName: aggNames) {
for (String aggName : aggNames) {
Aggregation aggResult = bucket.getAggregations().get(aggName);
// This indicates not that the value contained in the `aggResult` is null, but that the `aggResult` is not
// present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which
@ -109,16 +115,19 @@ public final class AggregationResultUtils {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
} else if (aggregation instanceof GeoBounds) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
} else if (aggregation instanceof Percentiles) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]",
throw new AggregationExtractionException(
"unsupported aggregation [{}] with name [{}]",
aggregation.getType(),
aggregation.getName());
aggregation.getName()
);
}
}
@SuppressWarnings("unchecked")
static void updateDocument(Map<String, Object> document, String fieldName, Object value) {
String[] fieldTokens = fieldName.split("\\.");
@ -132,23 +141,23 @@ public final class AggregationResultUtils {
if (i == fieldTokens.length - 1) {
if (internalMap.containsKey(token)) {
if (internalMap.get(token) instanceof Map) {
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
fieldName);
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
} else {
throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
throw new AggregationExtractionException(
"duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
fieldName,
internalMap.get(token),
value);
value
);
}
}
internalMap.put(token, value);
} else {
if (internalMap.containsKey(token)) {
if (internalMap.get(token) instanceof Map) {
internalMap = (Map<String, Object>)internalMap.get(token);
internalMap = (Map<String, Object>) internalMap.get(token);
} else {
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
fieldName);
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
}
} else {
Map<String, Object> newMap = new HashMap<>();
@ -172,15 +181,14 @@ public final class AggregationResultUtils {
static class SingleValueAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
SingleValue aggregation = (SingleValue)agg;
SingleValue aggregation = (SingleValue) agg;
// If the double is invalid, this indicates sparse data
if (Numbers.isValidDouble(aggregation.value()) == false) {
return null;
}
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) ||
aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))){
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
return aggregation.value();
} else {
return aggregation.getValueAsString();
@ -188,10 +196,25 @@ public final class AggregationResultUtils {
}
}
static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
Percentiles aggregation = (Percentiles) agg;
HashMap<String, Double> percentiles = new HashMap<>();
for (Percentile p : aggregation) {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
}
return percentiles;
}
}
static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
ScriptedMetric aggregation = (ScriptedMetric)agg;
ScriptedMetric aggregation = (ScriptedMetric) agg;
return aggregation.aggregation();
}
}
@ -199,7 +222,7 @@ public final class AggregationResultUtils {
static class GeoCentroidAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
GeoCentroid aggregation = (GeoCentroid)agg;
GeoCentroid aggregation = (GeoCentroid) agg;
// if the account is `0` iff there is no contained centroid
return aggregation.count() > 0 ? aggregation.centroid().toString() : null;
}
@ -208,7 +231,7 @@ public final class AggregationResultUtils {
static class GeoBoundsAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
GeoBounds aggregation = (GeoBounds)agg;
GeoBounds aggregation = (GeoBounds) agg;
if (aggregation.bottomRight() == null || aggregation.topLeft() == null) {
return null;
}
@ -216,30 +239,41 @@ public final class AggregationResultUtils {
// If the two geo_points are equal, it is a point
if (aggregation.topLeft().equals(aggregation.bottomRight())) {
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PointBuilder.TYPE.shapeName());
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat()));
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat())
);
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
} else if (Double.compare(aggregation.topLeft().getLat(), aggregation.bottomRight().getLat()) == 0
|| Double.compare(aggregation.topLeft().getLon(), aggregation.bottomRight().getLon()) == 0) {
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(
new Double[]{aggregation.topLeft().getLon(), aggregation.topLeft().getLat()},
new Double[]{aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat()}));
} else {
// neither points are equal, we have a polygon that is a square
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
final GeoPoint tl = aggregation.topLeft();
final GeoPoint br = aggregation.bottomRight();
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Collections.singletonList(Arrays.asList(
new Double[]{tl.getLon(), tl.getLat()},
new Double[]{br.getLon(), tl.getLat()},
new Double[]{br.getLon(), br.getLat()},
new Double[]{tl.getLon(), br.getLat()},
new Double[]{tl.getLon(), tl.getLat()})));
}
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(
new Double[] { aggregation.topLeft().getLon(), aggregation.topLeft().getLat() },
new Double[] { aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat() }
)
);
} else {
// neither points are equal, we have a polygon that is a square
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
final GeoPoint tl = aggregation.topLeft();
final GeoPoint br = aggregation.bottomRight();
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Collections.singletonList(
Arrays.asList(
new Double[] { tl.getLon(), tl.getLat() },
new Double[] { br.getLon(), tl.getLat() },
new Double[] { br.getLon(), br.getLat() },
new Double[] { tl.getLon(), br.getLat() },
new Double[] { tl.getLon(), tl.getLat() }
)
)
);
}
return geoShape;
}
}
}

View File

@ -6,7 +6,14 @@
package org.elasticsearch.xpack.transform.transforms.pivot;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -17,6 +24,7 @@ public final class Aggregations {
private static final String DYNAMIC = "_dynamic";
// the field mapping should be determined explicitly from the source field mapping if possible.
private static final String SOURCE = "_source";
private Aggregations() {}
/**
@ -40,7 +48,8 @@ public final class Aggregations {
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
WEIGHTED_AVG("weighted_avg", DYNAMIC),
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC);
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", "double");
private final String aggregationType;
private final String targetMapping;
@ -59,8 +68,9 @@ public final class Aggregations {
}
}
private static Set<String> aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name)
.collect(Collectors.toSet());
private static Set<String> aggregationSupported = Stream.of(AggregationType.values())
.map(AggregationType::name)
.collect(Collectors.toSet());
public static boolean isSupportedByTransform(String aggregationType) {
return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT));
@ -74,4 +84,19 @@ public final class Aggregations {
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping();
}
public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
if (agg instanceof PercentilesAggregationBuilder) {
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
// note: eclipse does not like p -> agg.getType()
// the merge function (p1, p2) -> p1 ignores duplicates
return Arrays.stream(percentilesAgg.percentiles())
.mapToObj(OutputFieldNameConverter::fromDouble)
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
}
// catch all
return Collections.singletonMap(agg.getName(), agg.getType());
}
}

View File

@ -66,7 +66,7 @@ public final class SchemaUtil {
) {
// collects the fieldnames used as source for aggregations
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
// collects the aggregation types by source name
// collects the aggregation types by output field name
Map<String, String> aggregationTypes = new HashMap<>();
// collects the fieldnames and target fieldnames used for grouping
Map<String, String> fieldNamesForGrouping = new HashMap<>();
@ -79,9 +79,9 @@ public final class SchemaUtil {
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation));
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
aggregationTypes.put(agg.getName(), agg.getType());
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg));
} else {
// execution should not reach this point
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));

View File

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.utils;
public final class OutputFieldNameConverter {
private OutputFieldNameConverter() {}
public static String fromDouble(double d) {
if (d == (long) d) return String.valueOf((long) d);
else return String.valueOf(d).replace('.', '_');
}
}

View File

@ -61,5 +61,9 @@ public class AggregationsTests extends ESTestCase {
// weighted_avg
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", "double"));
// percentile
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null));
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int"));
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.utils;
import org.elasticsearch.test.ESTestCase;
public class OutputFieldNameConverterTests extends ESTestCase {
public void testFromDouble() {
assertEquals("42_42", OutputFieldNameConverter.fromDouble(42.42));
// remove '.0' if possible
assertEquals("42", OutputFieldNameConverter.fromDouble(42.0));
// digit limit
assertEquals("42_42424242424242", OutputFieldNameConverter.fromDouble(42.4242424242424242424242424242424242));
// scientific notation keep the '.0'
assertEquals("1_0E-100", OutputFieldNameConverter.fromDouble(1.0E-100));
// scientific with digits
assertEquals("1_12345E-100", OutputFieldNameConverter.fromDouble(1.12345E-100));
// NaN (OutputFieldNameConverter clients should disallow that)
assertEquals("NaN", OutputFieldNameConverter.fromDouble(Double.NaN));
// infinity
assertEquals("-Infinity", OutputFieldNameConverter.fromDouble(Double.NEGATIVE_INFINITY));
}
}