[Transform] add support for filter aggregation (#52483)

add support for filter aggregations, refactor code for sub-aggregation support in mapping
deduction

fixes #52151
This commit is contained in:
Hendrik Muhs 2020-02-21 13:06:18 +01:00
parent 7fe2843a9e
commit 288ccae23b
8 changed files with 1074 additions and 589 deletions

View File

@ -1173,6 +1173,94 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertEquals(5, actual.longValue());
}
public void testPivotWithFilter() throws Exception {
String transformId = "filter_pivot";
String transformIndex = "filter_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"top_ratings\": {"
+ " \"filter\": {"
+ " \"range\": {"
+ " \"stars\": {"
+ " \"gte\": 4 "
+ " } } } },"
+ " \"top_ratings_detail\": {"
+ " \"filter\": {"
+ " \"range\": {"
+ " \"stars\": {"
+ " \"gte\": 4"
+ " } } },"
+ " \"aggregations\": {"
+ " \"unique_count\": {"
+ " \"cardinality\": {"
+ " \"field\": \"business_id\""
+ " } },"
+ " \"max\": {"
+ " \"max\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"min\": {"
+ " \"min\": {"
+ " \"field\": \"stars\""
+ " } }"
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
// get and check some users
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0);
assertEquals(29, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0);
assertEquals(4, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0);
assertEquals(5, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get(
0
);
assertEquals(4, actual.longValue());
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_2");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0);
assertEquals(19, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0);
assertEquals(4, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0);
assertEquals(5, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get(
0
);
assertEquals(3, actual.longValue());
}
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.geo.parsers.ShapeParser;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
@ -50,6 +51,7 @@ public final class AggregationResultUtils {
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@ -94,9 +96,8 @@ public final class AggregationResultUtils {
// present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which
// does not calculate a value, but instead manipulates other results.
if (aggResult != null) {
final String fieldType = fieldTypeMap.get(aggName);
AggValueExtractor extractor = getExtractor(aggResult);
updateDocument(document, aggName, extractor.value(aggResult, fieldType));
updateDocument(document, aggName, extractor.value(aggResult, fieldTypeMap, ""));
}
}
@ -117,6 +118,8 @@ public final class AggregationResultUtils {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
} else if (aggregation instanceof Percentiles) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else if (aggregation instanceof SingleBucketAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
@ -175,17 +178,19 @@ public final class AggregationResultUtils {
}
interface AggValueExtractor {
Object value(Aggregation aggregation, String fieldType);
Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix);
}
static class SingleValueAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
SingleValue aggregation = (SingleValue) agg;
// If the double is invalid, this indicates sparse data
if (Numbers.isValidDouble(aggregation.value()) == false) {
return null;
}
String fieldType = fieldTypeMap.get(lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName());
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
@ -198,7 +203,7 @@ public final class AggregationResultUtils {
static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
Percentiles aggregation = (Percentiles) agg;
HashMap<String, Double> percentiles = new HashMap<>();
@ -211,9 +216,34 @@ public final class AggregationResultUtils {
}
}
static class SingleBucketAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
SingleBucketAggregation aggregation = (SingleBucketAggregation) agg;
if (aggregation.getAggregations().iterator().hasNext() == false) {
return aggregation.getDocCount();
}
HashMap<String, Object> nested = new HashMap<>();
for (Aggregation subAgg : aggregation.getAggregations()) {
nested.put(
subAgg.getName(),
getExtractor(subAgg).value(
subAgg,
fieldTypeMap,
lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
)
);
}
return nested;
}
}
static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
ScriptedMetric aggregation = (ScriptedMetric) agg;
return aggregation.aggregation();
}
@ -221,7 +251,7 @@ public final class AggregationResultUtils {
static class GeoCentroidAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
GeoCentroid aggregation = (GeoCentroid) agg;
// if the account is `0` iff there is no contained centroid
return aggregation.count() > 0 ? aggregation.centroid().toString() : null;
@ -230,7 +260,7 @@ public final class AggregationResultUtils {
static class GeoBoundsAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
GeoBounds aggregation = (GeoBounds) agg;
if (aggregation.bottomRight() == null || aggregation.topLeft() == null) {
return null;

View File

@ -6,15 +6,19 @@
package org.elasticsearch.xpack.transform.transforms.pivot;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -52,7 +56,6 @@ public final class Aggregations {
"date_range",
"diversified_sampler",
"extended_stats", // https://github.com/elastic/elasticsearch/issues/51925
"filter", // https://github.com/elastic/elasticsearch/issues/52151
"filters",
"geo_distance",
"geohash_grid",
@ -102,7 +105,8 @@ public final class Aggregations {
WEIGHTED_AVG("weighted_avg", DYNAMIC),
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", DOUBLE);
PERCENTILES("percentiles", DOUBLE),
FILTER("filter", LONG);
private final String aggregationType;
private final String targetMapping;
@ -146,28 +150,68 @@ public final class Aggregations {
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
if (agg.getTargetMapping().equals(SOURCE)) {
if (sourceType == null) {
// this should never happen and would mean a bug in the calling code, the error is logged in {@link
// org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil#resolveMappings()}
return null;
}
// scaled float requires an additional parameter "scaling_factor", which we do not know, therefore we fallback to float
if (sourceType.equals(SCALED_FLOAT)) {
return FLOAT;
}
return sourceType;
}
return agg.getTargetMapping();
}
public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
public static Tuple<Map<String, String>, Map<String, String>> getAggregationInputAndOutputTypes(AggregationBuilder agg) {
if (agg instanceof PercentilesAggregationBuilder) {
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
// note: eclipse does not like p -> agg.getType()
// the merge function (p1, p2) -> p1 ignores duplicates
return Arrays.stream(percentilesAgg.percentiles())
return new Tuple<>(
Collections.emptyMap(),
Arrays.stream(percentilesAgg.percentiles())
.mapToObj(OutputFieldNameConverter::fromDouble)
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1))
);
}
// catch all
return Collections.singletonMap(agg.getName(), agg.getType());
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
return new Tuple<>(
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()),
Collections.singletonMap(agg.getName(), agg.getType())
);
}
// does the agg have sub aggregations?
if (agg.getSubAggregations().size() > 0) {
HashMap<String, String> outputTypes = new HashMap<>();
HashMap<String, String> inputTypes = new HashMap<>();
for (AggregationBuilder subAgg : agg.getSubAggregations()) {
Tuple<Map<String, String>, Map<String, String>> subAggregationTypes = getAggregationInputAndOutputTypes(subAgg);
for (Entry<String, String> subAggOutputType : subAggregationTypes.v2().entrySet()) {
outputTypes.put(String.join(".", agg.getName(), subAggOutputType.getKey()), subAggOutputType.getValue());
}
for (Entry<String, String> subAggInputType : subAggregationTypes.v1().entrySet()) {
inputTypes.put(String.join(".", agg.getName(), subAggInputType.getKey()), subAggInputType.getValue());
}
}
return new Tuple<>(inputTypes, outputTypes);
}
// catch all in case no special handling required
return new Tuple<>(Collections.emptyMap(), Collections.singletonMap(agg.getName(), agg.getType()));
}
}

View File

@ -14,12 +14,10 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
@ -76,17 +74,9 @@ public final class SchemaUtil {
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation));
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg));
} else {
// execution should not reach this point
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));
return;
}
Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg);
aggregationSourceFieldNames.putAll(inputAndOutputTypes.v1());
aggregationTypes.putAll(inputAndOutputTypes.v2());
}
// For pipeline aggs, since they are referencing other aggregations in the payload, they do not have any

View File

@ -0,0 +1,287 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.transforms.pivot;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AggregationSchemaAndResultTests extends ESTestCase {
private Client client;
@Before
public void setupClient() {
if (client != null) {
client.close();
}
client = new MyMockClient(getTestName());
}
@After
public void tearDownClient() {
client.close();
}
private class MyMockClient extends NoOpClient {
MyMockClient(String testName) {
super(testName);
}
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (request instanceof FieldCapabilitiesRequest) {
FieldCapabilitiesRequest fieldCapsRequest = (FieldCapabilitiesRequest) request;
Map<String, Map<String, FieldCapabilities>> fieldCaps = new HashMap<>();
for (String field : fieldCapsRequest.fields()) {
// expect a field name like "double_field" where type is a prefix
String[] nameTypePair = Strings.split(field, "_");
String type = nameTypePair != null ? nameTypePair[0] : "long";
fieldCaps.put(
field,
Collections.singletonMap(
type,
new FieldCapabilities(field, type, true, true, null, null, null, Collections.emptyMap())
)
);
}
// FieldCapabilitiesResponse is package private, thats why we use a mock
FieldCapabilitiesResponse response = mock(FieldCapabilitiesResponse.class);
when(response.get()).thenReturn(fieldCaps);
for (String field : fieldCaps.keySet()) {
when(response.getField(field)).thenReturn(fieldCaps.get(field));
}
listener.onResponse((Response) response);
return;
}
super.doExecute(action, request, listener);
}
}
public void testBasic() throws InterruptedException {
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
// aggs which produce 1 output
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("long_stars"));
aggs.addAggregator(AggregationBuilders.max("max_rating").field("long_stars"));
aggs.addAggregator(AggregationBuilders.count("count_rating").field("keyword_group"));
aggs.addAggregator(AggregationBuilders.min("min_something").field("float_something"));
// percentile produces 1 output per percentile + 1 for the parent object
aggs.addAggregator(AggregationBuilders.percentiles("p_rating").field("long_stars").percentiles(1, 5, 10, 50, 99.9));
// scripted metric produces no output because its dynamic
aggs.addAggregator(AggregationBuilders.scriptedMetric("collapsed_ratings"));
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
mappings -> {
assertEquals(groupConfig.getGroups().size() + 10, mappings.size());
assertEquals("long", mappings.get("max_rating"));
assertEquals("double", mappings.get("avg_rating"));
assertEquals("long", mappings.get("count_rating"));
assertEquals("float", mappings.get("min_something"));
assertEquals("object", mappings.get("p_rating"));
assertEquals("double", mappings.get("p_rating.1"));
assertEquals("double", mappings.get("p_rating.5"));
assertEquals("double", mappings.get("p_rating.10"));
assertEquals("double", mappings.get("p_rating.99_9"));
Aggregation agg = AggregationResultUtilsTests.createSingleMetricAgg("avg_rating", 33.3, "33.3");
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(33.3));
agg = AggregationResultUtilsTests.createPercentilesAgg(
"p_agg",
Arrays.asList(new Percentile(1, 0), new Percentile(50, 1.2), new Percentile(99, 2.4), new Percentile(99.5, 4.3))
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""),
equalTo(asMap("1", 0.0, "50", 1.2, "99", 2.4, "99_5", 4.3))
);
}
);
}
public void testNested() throws InterruptedException {
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.filter("filter_1", new TermQueryBuilder("favorite_drink", "slurm")));
aggs.addAggregator(
AggregationBuilders.filter("filter_2", new TermQueryBuilder("species", "amphibiosan"))
.subAggregation(AggregationBuilders.max("max_drinks_2").field("long_drinks"))
);
aggs.addAggregator(
AggregationBuilders.filter("filter_3", new TermQueryBuilder("spaceship", "nimbus"))
.subAggregation(
AggregationBuilders.filter("filter_3_1", new TermQueryBuilder("species", "amphibiosan"))
.subAggregation(AggregationBuilders.max("max_drinks_3").field("float_drinks"))
)
);
aggs.addAggregator(
AggregationBuilders.filter("filter_4", new TermQueryBuilder("organization", "doop"))
.subAggregation(
AggregationBuilders.filter("filter_4_1", new TermQueryBuilder("spaceship", "nimbus"))
.subAggregation(
AggregationBuilders.filter("filter_4_1_1", new TermQueryBuilder("species", "amphibiosan"))
.subAggregation(AggregationBuilders.max("max_drinks_4").field("float_drinks"))
)
.subAggregation(
AggregationBuilders.filter("filter_4_1_2", new TermQueryBuilder("species", "mutant"))
.subAggregation(AggregationBuilders.max("min_drinks_4").field("double_drinks"))
)
)
);
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
mappings -> {
assertEquals(groupConfig.getGroups().size() + 12, mappings.size());
assertEquals("long", mappings.get("filter_1"));
assertEquals("object", mappings.get("filter_2"));
assertEquals("long", mappings.get("filter_2.max_drinks_2"));
assertEquals("object", mappings.get("filter_3"));
assertEquals("object", mappings.get("filter_3.filter_3_1"));
assertEquals("float", mappings.get("filter_3.filter_3_1.max_drinks_3"));
assertEquals("object", mappings.get("filter_4"));
assertEquals("object", mappings.get("filter_4.filter_4_1"));
assertEquals("object", mappings.get("filter_4.filter_4_1.filter_4_1_1"));
assertEquals("float", mappings.get("filter_4.filter_4_1.filter_4_1_1.max_drinks_4"));
assertEquals("object", mappings.get("filter_4.filter_4_1.filter_4_1_2"));
assertEquals("double", mappings.get("filter_4.filter_4_1.filter_4_1_2.min_drinks_4"));
Aggregation agg = AggregationResultUtilsTests.createSingleBucketAgg("filter_1", 36363);
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(36363L));
agg = AggregationResultUtilsTests.createSingleBucketAgg(
"filter_2",
23144,
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_2", 45.0, "forty_five")
);
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45.0)));
agg = AggregationResultUtilsTests.createSingleBucketAgg(
"filter_3",
62426,
AggregationResultUtilsTests.createSingleBucketAgg(
"filter_3_1",
33365,
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_3", 35.0, "thirty_five")
)
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""),
equalTo(asMap("filter_3_1", asMap("max_drinks_3", 35.0)))
);
agg = AggregationResultUtilsTests.createSingleBucketAgg(
"filter_4",
62426,
AggregationResultUtilsTests.createSingleBucketAgg(
"filter_4_1",
33365,
AggregationResultUtilsTests.createSingleBucketAgg(
"filter_4_1_1",
12543,
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_4", 1.0, "a small one")
),
AggregationResultUtilsTests.createSingleBucketAgg(
"filter_4_1_2",
526,
AggregationResultUtilsTests.createSingleMetricAgg("min_drinks_4", 7395.0, "a lot")
)
)
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""),
equalTo(
asMap(
"filter_4_1",
asMap("filter_4_1_1", asMap("max_drinks_4", 1.0), "filter_4_1_2", asMap("min_drinks_4", 7395.0))
)
)
);
}
);
}
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
furtherTests.accept(r);
}, e -> { fail("got unexpected exception: " + e); }), latch);
function.accept(listener);
assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS));
}
static Map<String, Object> asMap(Object... fields) {
assert fields.length % 2 == 0;
final Map<String, Object> map = new HashMap<>();
for (int i = 0; i < fields.length; i += 2) {
String field = (String) fields[i];
map.put(field, fields[i + 1]);
}
return map;
}
}

View File

@ -6,16 +6,23 @@
package org.elasticsearch.xpack.transform.transforms.pivot;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class AggregationsTests extends ESTestCase {
@ -77,6 +84,14 @@ public class AggregationsTests extends ESTestCase {
// percentile
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null));
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int"));
// filter
assertEquals("long", Aggregations.resolveTargetMapping("filter", null));
assertEquals("long", Aggregations.resolveTargetMapping("filter", "long"));
assertEquals("long", Aggregations.resolveTargetMapping("filter", "double"));
// corner case: source type null
assertEquals(null, Aggregations.resolveTargetMapping("min", null));
}
public void testAggregationsVsTransforms() {
@ -105,4 +120,101 @@ public class AggregationsTests extends ESTestCase {
);
}
}
public void testGetAggregationOutputTypesPercentiles() {
AggregationBuilder percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles").percentiles(1.0, 5.0, 10.0);
Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(
percentialAggregationBuilder
);
assertTrue(inputAndOutputTypes.v1().isEmpty());
Map<String, String> outputTypes = inputAndOutputTypes.v2();
assertEquals(3, outputTypes.size());
assertEquals("percentiles", outputTypes.get("percentiles.1"));
assertEquals("percentiles", outputTypes.get("percentiles.5"));
assertEquals("percentiles", outputTypes.get("percentiles.10"));
// note: using the constructor, omits validation, in reality this test might fail
percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles").percentiles(1.0, 5.0, 5.0, 10.0);
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(percentialAggregationBuilder);
assertTrue(inputAndOutputTypes.v1().isEmpty());
outputTypes = inputAndOutputTypes.v2();
assertEquals(3, outputTypes.size());
assertEquals("percentiles", outputTypes.get("percentiles.1"));
assertEquals("percentiles", outputTypes.get("percentiles.5"));
assertEquals("percentiles", outputTypes.get("percentiles.10"));
}
public void testGetAggregationOutputTypesSubAggregations() {
AggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter_1", new TermQueryBuilder("type", "cat"));
Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(
filterAggregationBuilder
);
assertTrue(inputAndOutputTypes.v1().isEmpty());
Map<String, String> outputTypes = inputAndOutputTypes.v2();
assertEquals(1, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1"));
AggregationBuilder subFilterAggregationBuilder = new FilterAggregationBuilder("filter_2", new TermQueryBuilder("subtype", "siam"));
filterAggregationBuilder.subAggregation(subFilterAggregationBuilder);
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder);
assertTrue(inputAndOutputTypes.v1().isEmpty());
outputTypes = inputAndOutputTypes.v2();
assertEquals(1, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1.filter_2"));
filterAggregationBuilder.subAggregation(new MaxAggregationBuilder("max_2").field("max_field"));
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder);
assertEquals(1, inputAndOutputTypes.v1().size());
Map<String, String> inputTypes = inputAndOutputTypes.v1();
assertEquals("max_field", inputTypes.get("filter_1.max_2"));
outputTypes = inputAndOutputTypes.v2();
assertEquals(2, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1.filter_2"));
assertEquals("max", outputTypes.get("filter_1.max_2"));
subFilterAggregationBuilder.subAggregation(new FilterAggregationBuilder("filter_3", new TermQueryBuilder("color", "white")));
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder);
assertEquals(1, inputAndOutputTypes.v1().size());
outputTypes = inputAndOutputTypes.v2();
assertEquals(2, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3"));
assertEquals("max", outputTypes.get("filter_1.max_2"));
subFilterAggregationBuilder.subAggregation(new MinAggregationBuilder("min_3").field("min_field"));
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder);
assertEquals(2, inputAndOutputTypes.v1().size());
inputTypes = inputAndOutputTypes.v1();
assertEquals("max_field", inputTypes.get("filter_1.max_2"));
assertEquals("min_field", inputTypes.get("filter_1.filter_2.min_3"));
outputTypes = inputAndOutputTypes.v2();
assertEquals(3, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3"));
assertEquals("max", outputTypes.get("filter_1.max_2"));
assertEquals("min", outputTypes.get("filter_1.filter_2.min_3"));
subFilterAggregationBuilder.subAggregation(
new PercentilesAggregationBuilder("percentiles").percentiles(33.3, 44.4, 88.8, 99.5)
);
inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder);
assertEquals(2, inputAndOutputTypes.v1().size());
outputTypes = inputAndOutputTypes.v2();
assertEquals(7, outputTypes.size());
assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3"));
assertEquals("max", outputTypes.get("filter_1.max_2"));
assertEquals("min", outputTypes.get("filter_1.filter_2.min_3"));
assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.33_3"));
assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.44_4"));
assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.88_8"));
assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.99_5"));
}
}

View File

@ -215,8 +215,8 @@ public class PivotTests extends ESTestCase {
"{\"pivot_scripted_metric\": {\n"
+ "\"scripted_metric\": {\n"
+ " \"init_script\" : \"state.transactions = []\",\n"
+ " \"map_script\" : \"state.transactions.add("
+ "doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)\",\n"
+ " \"map_script\" : "
+ " \"state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)\", \n"
+ " \"combine_script\" : \"double profit = 0; for (t in state.transactions) { profit += t } return profit\",\n"
+ " \"reduce_script\" : \"double profit = 0; for (a in states) { profit += a } return profit\"\n"
+ " }\n"
@ -251,6 +251,12 @@ public class PivotTests extends ESTestCase {
+ "}"
);
}
if (agg.equals(AggregationType.FILTER.getName())) {
return parseAggregations(
"{" + "\"pivot_filter\": {" + " \"filter\": {" + " \"term\": {\"field\": \"value\"}" + " }" + "}" + "}"
);
}
return parseAggregations(
"{\n" + " \"pivot_" + agg + "\": {\n" + " \"" + agg + "\": {\n" + " \"field\": \"values\"\n" + " }\n" + " }" + "}"
);