diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index f8b1f91e516..c3bb747604a 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -1173,6 +1173,94 @@ public class TransformPivotRestIT extends TransformRestTestCase { assertEquals(5, actual.longValue()); } + public void testPivotWithFilter() throws Exception { + String transformId = "filter_pivot"; + String transformIndex = "filter_pivot_reviews"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex); + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + String config = "{" + + " \"source\": {\"index\":\"" + + REVIEWS_INDEX_NAME + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"}," + + " \"frequency\": \"1s\"," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"top_ratings\": {" + + " \"filter\": {" + + " \"range\": {" + + " \"stars\": {" + + " \"gte\": 4 " + + " } } } }," + + " \"top_ratings_detail\": {" + + " \"filter\": {" + + " \"range\": {" + + " \"stars\": {" + + " \"gte\": 4" + + " } } }," + + " \"aggregations\": {" + + " \"unique_count\": {" + + " \"cardinality\": {" + + " \"field\": \"business_id\"" + + " } }," + + " \"max\": {" + + " \"max\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"min\": {" + + " \"min\": {" + + " \"field\": \"stars\"" + + " } }" + + " } } } }" + + "}"; + + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForTransform(transformId, transformIndex); + assertTrue(indexExists(transformIndex)); + // get and check some users + + Map searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + + Number actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0); + assertEquals(29, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0); + assertEquals(4, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0); + assertEquals(5, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get( + 0 + ); + assertEquals(4, actual.longValue()); + + searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_2"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0); + assertEquals(19, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0); + assertEquals(4, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0); + assertEquals(5, actual.longValue()); + actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get( + 0 + ); + assertEquals(3, actual.longValue()); + } + private void createDateNanoIndex(String indexName, int numDocs) throws IOException { // create mapping try (XContentBuilder builder = jsonBuilder()) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index 5b581340080..a0ae5518b53 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.geo.parsers.ShapeParser; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.metrics.GeoBounds; import org.elasticsearch.search.aggregations.metrics.GeoCentroid; @@ -50,6 +51,7 @@ public final class AggregationResultUtils { tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor()); tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor()); tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor()); + tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor()); TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap); } @@ -94,9 +96,8 @@ public final class AggregationResultUtils { // present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which // does not calculate a value, but instead manipulates other results. if (aggResult != null) { - final String fieldType = fieldTypeMap.get(aggName); AggValueExtractor extractor = getExtractor(aggResult); - updateDocument(document, aggName, extractor.value(aggResult, fieldType)); + updateDocument(document, aggName, extractor.value(aggResult, fieldTypeMap, "")); } } @@ -117,6 +118,8 @@ public final class AggregationResultUtils { return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName()); } else if (aggregation instanceof Percentiles) { return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName()); + } else if (aggregation instanceof SingleBucketAggregation) { + return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName()); } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible @@ -175,17 +178,19 @@ public final class AggregationResultUtils { } interface AggValueExtractor { - Object value(Aggregation aggregation, String fieldType); + Object value(Aggregation aggregation, Map fieldTypeMap, String lookupFieldPrefix); } static class SingleValueAggExtractor implements AggValueExtractor { @Override - public Object value(Aggregation agg, String fieldType) { + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { SingleValue aggregation = (SingleValue) agg; // If the double is invalid, this indicates sparse data if (Numbers.isValidDouble(aggregation.value()) == false) { return null; } + + String fieldType = fieldTypeMap.get(lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()); // If the type is numeric or if the formatted string is the same as simply making the value a string, // gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs. if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) { @@ -198,7 +203,7 @@ public final class AggregationResultUtils { static class PercentilesAggExtractor implements AggValueExtractor { @Override - public Object value(Aggregation agg, String fieldType) { + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { Percentiles aggregation = (Percentiles) agg; HashMap percentiles = new HashMap<>(); @@ -211,9 +216,34 @@ public final class AggregationResultUtils { } } + static class SingleBucketAggExtractor implements AggValueExtractor { + @Override + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { + SingleBucketAggregation aggregation = (SingleBucketAggregation) agg; + + if (aggregation.getAggregations().iterator().hasNext() == false) { + return aggregation.getDocCount(); + } + + HashMap nested = new HashMap<>(); + for (Aggregation subAgg : aggregation.getAggregations()) { + nested.put( + subAgg.getName(), + getExtractor(subAgg).value( + subAgg, + fieldTypeMap, + lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName() + ) + ); + } + + return nested; + } + } + static class ScriptedMetricAggExtractor implements AggValueExtractor { @Override - public Object value(Aggregation agg, String fieldType) { + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { ScriptedMetric aggregation = (ScriptedMetric) agg; return aggregation.aggregation(); } @@ -221,7 +251,7 @@ public final class AggregationResultUtils { static class GeoCentroidAggExtractor implements AggValueExtractor { @Override - public Object value(Aggregation agg, String fieldType) { + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { GeoCentroid aggregation = (GeoCentroid) agg; // if the account is `0` iff there is no contained centroid return aggregation.count() > 0 ? aggregation.centroid().toString() : null; @@ -230,7 +260,7 @@ public final class AggregationResultUtils { static class GeoBoundsAggExtractor implements AggValueExtractor { @Override - public Object value(Aggregation agg, String fieldType) { + public Object value(Aggregation agg, Map fieldTypeMap, String lookupFieldPrefix) { GeoBounds aggregation = (GeoBounds) agg; if (aggregation.bottomRight() == null || aggregation.topLeft() == null) { return null; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java index 1cf139f5713..eb2289088ea 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java @@ -6,15 +6,19 @@ package org.elasticsearch.xpack.transform.transforms.pivot; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,7 +56,6 @@ public final class Aggregations { "date_range", "diversified_sampler", "extended_stats", // https://github.com/elastic/elasticsearch/issues/51925 - "filter", // https://github.com/elastic/elasticsearch/issues/52151 "filters", "geo_distance", "geohash_grid", @@ -102,7 +105,8 @@ public final class Aggregations { WEIGHTED_AVG("weighted_avg", DYNAMIC), BUCKET_SELECTOR("bucket_selector", DYNAMIC), BUCKET_SCRIPT("bucket_script", DYNAMIC), - PERCENTILES("percentiles", DOUBLE); + PERCENTILES("percentiles", DOUBLE), + FILTER("filter", LONG); private final String aggregationType; private final String targetMapping; @@ -146,28 +150,68 @@ public final class Aggregations { AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT)); if (agg.getTargetMapping().equals(SOURCE)) { + + if (sourceType == null) { + // this should never happen and would mean a bug in the calling code, the error is logged in {@link + // org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil#resolveMappings()} + return null; + } + // scaled float requires an additional parameter "scaling_factor", which we do not know, therefore we fallback to float if (sourceType.equals(SCALED_FLOAT)) { return FLOAT; } + return sourceType; } return agg.getTargetMapping(); } - public static Map getAggregationOutputTypes(AggregationBuilder agg) { + public static Tuple, Map> getAggregationInputAndOutputTypes(AggregationBuilder agg) { if (agg instanceof PercentilesAggregationBuilder) { PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg; // note: eclipse does not like p -> agg.getType() // the merge function (p1, p2) -> p1 ignores duplicates - return Arrays.stream(percentilesAgg.percentiles()) - .mapToObj(OutputFieldNameConverter::fromDouble) - .collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1)); + return new Tuple<>( + Collections.emptyMap(), + Arrays.stream(percentilesAgg.percentiles()) + .mapToObj(OutputFieldNameConverter::fromDouble) + .collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1)) + ); } - // catch all - return Collections.singletonMap(agg.getName(), agg.getType()); + + if (agg instanceof ValuesSourceAggregationBuilder) { + ValuesSourceAggregationBuilder valueSourceAggregation = (ValuesSourceAggregationBuilder) agg; + return new Tuple<>( + Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()), + Collections.singletonMap(agg.getName(), agg.getType()) + ); + } + + // does the agg have sub aggregations? + if (agg.getSubAggregations().size() > 0) { + HashMap outputTypes = new HashMap<>(); + HashMap inputTypes = new HashMap<>(); + + for (AggregationBuilder subAgg : agg.getSubAggregations()) { + Tuple, Map> subAggregationTypes = getAggregationInputAndOutputTypes(subAgg); + + for (Entry subAggOutputType : subAggregationTypes.v2().entrySet()) { + outputTypes.put(String.join(".", agg.getName(), subAggOutputType.getKey()), subAggOutputType.getValue()); + } + + for (Entry subAggInputType : subAggregationTypes.v1().entrySet()) { + inputTypes.put(String.join(".", agg.getName(), subAggInputType.getKey()), subAggInputType.getValue()); + } + } + + return new Tuple<>(inputTypes, outputTypes); + } + + // catch all in case no special handling required + return new Tuple<>(Collections.emptyMap(), Collections.singletonMap(agg.getName(), agg.getType())); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index 44998d45b60..bd726b7e00f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -14,12 +14,10 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; -import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder; -import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; @@ -76,17 +74,9 @@ public final class SchemaUtil { .forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); }); for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { - if (agg instanceof ValuesSourceAggregationBuilder) { - ValuesSourceAggregationBuilder valueSourceAggregation = (ValuesSourceAggregationBuilder) agg; - aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field()); - aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation)); - } else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { - aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg)); - } else { - // execution should not reach this point - listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]")); - return; - } + Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg); + aggregationSourceFieldNames.putAll(inputAndOutputTypes.v1()); + aggregationTypes.putAll(inputAndOutputTypes.v2()); } // For pipeline aggs, since they are referencing other aggregations in the payload, they do not have any diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java index a2b878ee7e0..485384378e9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite; import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; @@ -46,6 +47,8 @@ import org.elasticsearch.search.aggregations.metrics.ParsedScriptedMetric; import org.elasticsearch.search.aggregations.metrics.ParsedStats; import org.elasticsearch.search.aggregations.metrics.ParsedSum; import org.elasticsearch.search.aggregations.metrics.ParsedValueCount; +import org.elasticsearch.search.aggregations.metrics.Percentile; +import org.elasticsearch.search.aggregations.metrics.Percentiles; import org.elasticsearch.search.aggregations.metrics.ScriptedMetric; import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; @@ -105,9 +108,10 @@ public class AggregationResultUtilsTests extends ESTestCase { map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); - namedXContents = map.entrySet().stream() - .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue())) - .collect(Collectors.toList()); + namedXContents = map.entrySet() + .stream() + .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue())) + .collect(Collectors.toList()); } @Override @@ -118,56 +122,29 @@ public class AggregationResultUtilsTests extends ESTestCase { public void testExtractCompositeAggregationResults() throws IOException { String targetField = randomAlphaOfLengthBetween(5, 10); - GroupConfig groupBy = parseGroupConfig("{ \"" + targetField + "\" : {" - + "\"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + "} } }"); + GroupConfig groupBy = parseGroupConfig( + "{ \"" + targetField + "\" : {" + "\"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + "} } }" + ); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "avg#" + aggName; Collection aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName)); Map input = asMap( - "buckets", - asList( - asMap( - KEY, asMap( - targetField, "ID1"), - aggTypedName, asMap( - "value", 42.33), - DOC_COUNT, 8), - asMap( - KEY, asMap( - targetField, "ID2"), - aggTypedName, asMap( - "value", 28.99), - DOC_COUNT, 3), - asMap( - KEY, asMap( - targetField, "ID3"), - aggTypedName, asMap( - "value", Double.NaN), - DOC_COUNT, 0) - )); + "buckets", + asList( + asMap(KEY, asMap(targetField, "ID1"), aggTypedName, asMap("value", 42.33), DOC_COUNT, 8), + asMap(KEY, asMap(targetField, "ID2"), aggTypedName, asMap("value", 28.99), DOC_COUNT, 3), + asMap(KEY, asMap(targetField, "ID3"), aggTypedName, asMap("value", Double.NaN), DOC_COUNT, 0) + ) + ); List> expected = asList( - asMap( - targetField, "ID1", - aggName, 42.33 - ), - asMap( - targetField, "ID2", - aggName, 28.99 - ), - asMap( - targetField, "ID3", - aggName, null - ) - ); - Map fieldTypeMap = asStringMap( - targetField, "keyword", - aggName, "double" + asMap(targetField, "ID1", aggName, 42.33), + asMap(targetField, "ID2", aggName, 28.99), + asMap(targetField, "ID3", aggName, null) ); + Map fieldTypeMap = asStringMap(targetField, "keyword", aggName, "double"); executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 11); } @@ -175,172 +152,53 @@ public class AggregationResultUtilsTests extends ESTestCase { String targetField = randomAlphaOfLengthBetween(5, 10); String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - GroupConfig groupBy = parseGroupConfig("{" - + "\"" + targetField + "\" : {" + GroupConfig groupBy = parseGroupConfig( + "{" + + "\"" + + targetField + + "\" : {" + " \"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + " } }," - + "\"" + targetField2 + "\" : {" + + "\"" + + targetField2 + + "\" : {" + " \"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + " } }" - + "}"); + + "}" + ); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "avg#" + aggName; Collection aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName)); Map input = asMap( - "buckets", - asList( - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 42.33), - DOC_COUNT, 1), - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 8.4), - DOC_COUNT, 2), - asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 28.99), - DOC_COUNT, 3), - asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", Double.NaN), - DOC_COUNT, 0) - )); + "buckets", + asList( + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID1_2"), aggTypedName, asMap("value", 42.33), DOC_COUNT, 1), + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID2_2"), aggTypedName, asMap("value", 8.4), DOC_COUNT, 2), + asMap(KEY, asMap(targetField, "ID2", targetField2, "ID1_2"), aggTypedName, asMap("value", 28.99), DOC_COUNT, 3), + asMap(KEY, asMap(targetField, "ID3", targetField2, "ID2_2"), aggTypedName, asMap("value", Double.NaN), DOC_COUNT, 0) + ) + ); List> expected = asList( - asMap( - targetField, "ID1", - targetField2, "ID1_2", - aggName, 42.33 - ), - asMap( - targetField, "ID1", - targetField2, "ID2_2", - aggName, 8.4 - ), - asMap( - targetField, "ID2", - targetField2, "ID1_2", - aggName, 28.99 - ), - asMap( - targetField, "ID3", - targetField2, "ID2_2", - aggName, null - ) - ); - Map fieldTypeMap = asStringMap( - aggName, "double", - targetField, "keyword", - targetField2, "keyword" + asMap(targetField, "ID1", targetField2, "ID1_2", aggName, 42.33), + asMap(targetField, "ID1", targetField2, "ID2_2", aggName, 8.4), + asMap(targetField, "ID2", targetField2, "ID1_2", aggName, 28.99), + asMap(targetField, "ID3", targetField2, "ID2_2", aggName, null) ); + Map fieldTypeMap = asStringMap(aggName, "double", targetField, "keyword", targetField2, "keyword"); executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 6); } public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { String targetField = randomAlphaOfLengthBetween(5, 10); - GroupConfig groupBy = parseGroupConfig("{\"" + targetField + "\" : {" - + "\"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + "} } }"); - - String aggName = randomAlphaOfLengthBetween(5, 10); - String aggTypedName = "avg#" + aggName; - - String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - String aggTypedName2 = "max#" + aggName2; - - Collection aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2)); - - Map input = asMap( - "buckets", - asList( - asMap( - KEY, asMap( - targetField, "ID1"), - aggTypedName, asMap( - "value", 42.33), - aggTypedName2, asMap( - "value", 9.9), - DOC_COUNT, 111), - asMap( - KEY, asMap( - targetField, "ID2"), - aggTypedName, asMap( - "value", 28.99), - aggTypedName2, asMap( - "value", 222.33), - DOC_COUNT, 88), - asMap( - KEY, asMap( - targetField, "ID3"), - aggTypedName, asMap( - "value", 12.55), - aggTypedName2, asMap( - "value", Double.NaN), - DOC_COUNT, 1) - )); - - List> expected = asList( - asMap( - targetField, "ID1", - aggName, 42.33, - aggName2, 9.9 - ), - asMap( - targetField, "ID2", - aggName, 28.99, - aggName2, 222.33 - ), - asMap( - targetField, "ID3", - aggName, 12.55, - aggName2, null - ) - ); - Map fieldTypeMap = asStringMap( - targetField, "keyword", - aggName, "double", - aggName2, "double" + GroupConfig groupBy = parseGroupConfig( + "{\"" + targetField + "\" : {" + "\"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + "} } }" ); - executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 200); - } - - public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException { - String targetField = randomAlphaOfLengthBetween(5, 10); - String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - - GroupConfig groupBy = parseGroupConfig("{" - + "\"" + targetField + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }," - + "\"" + targetField2 + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }" - + "}"); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "avg#" + aggName; @@ -354,82 +212,137 @@ public class AggregationResultUtilsTests extends ESTestCase { "buckets", asList( asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 42.33), - aggTypedName2, asMap( - "value", 9.9, - "value_as_string", "9.9F"), - DOC_COUNT, 1), + KEY, + asMap(targetField, "ID1"), + aggTypedName, + asMap("value", 42.33), + aggTypedName2, + asMap("value", 9.9), + DOC_COUNT, + 111 + ), asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 8.4), - aggTypedName2, asMap( - "value", 222.33, - "value_as_string", "222.33F"), - DOC_COUNT, 2), + KEY, + asMap(targetField, "ID2"), + aggTypedName, + asMap("value", 28.99), + aggTypedName2, + asMap("value", 222.33), + DOC_COUNT, + 88 + ), asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 28.99), - aggTypedName2, asMap( - "value", -2.44, - "value_as_string", "-2.44F"), - DOC_COUNT, 3), - asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 12.55), - aggTypedName2, asMap( - "value", Double.NaN, - "value_as_string", "NaN"), - DOC_COUNT, 4) - )); - - List> expected = asList( - asMap( - targetField, "ID1", - targetField2, "ID1_2", - aggName, 42.33, - aggName2, "9.9F" - ), - asMap( - targetField, "ID1", - targetField2, "ID2_2", - aggName, 8.4, - aggName2, "222.33F" - ), - asMap( - targetField, "ID2", - targetField2, "ID1_2", - aggName, 28.99, - aggName2, "-2.44F" - ), - asMap( - targetField, "ID3", - targetField2, "ID2_2", - aggName, 12.55, - aggName2, null + KEY, + asMap(targetField, "ID3"), + aggTypedName, + asMap("value", 12.55), + aggTypedName2, + asMap("value", Double.NaN), + DOC_COUNT, + 1 + ) ) ); + + List> expected = asList( + asMap(targetField, "ID1", aggName, 42.33, aggName2, 9.9), + asMap(targetField, "ID2", aggName, 28.99, aggName2, 222.33), + asMap(targetField, "ID3", aggName, 12.55, aggName2, null) + ); + Map fieldTypeMap = asStringMap(targetField, "keyword", aggName, "double", aggName2, "double"); + executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 200); + } + + public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + + GroupConfig groupBy = parseGroupConfig( + "{" + + "\"" + + targetField + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }," + + "\"" + + targetField2 + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }" + + "}" + ); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + + String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + String aggTypedName2 = "max#" + aggName2; + + Collection aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2)); + + Map input = asMap( + "buckets", + asList( + asMap( + KEY, + asMap(targetField, "ID1", targetField2, "ID1_2"), + aggTypedName, + asMap("value", 42.33), + aggTypedName2, + asMap("value", 9.9, "value_as_string", "9.9F"), + DOC_COUNT, + 1 + ), + asMap( + KEY, + asMap(targetField, "ID1", targetField2, "ID2_2"), + aggTypedName, + asMap("value", 8.4), + aggTypedName2, + asMap("value", 222.33, "value_as_string", "222.33F"), + DOC_COUNT, + 2 + ), + asMap( + KEY, + asMap(targetField, "ID2", targetField2, "ID1_2"), + aggTypedName, + asMap("value", 28.99), + aggTypedName2, + asMap("value", -2.44, "value_as_string", "-2.44F"), + DOC_COUNT, + 3 + ), + asMap( + KEY, + asMap(targetField, "ID3", targetField2, "ID2_2"), + aggTypedName, + asMap("value", 12.55), + aggTypedName2, + asMap("value", Double.NaN, "value_as_string", "NaN"), + DOC_COUNT, + 4 + ) + ) + ); + + List> expected = asList( + asMap(targetField, "ID1", targetField2, "ID1_2", aggName, 42.33, aggName2, "9.9F"), + asMap(targetField, "ID1", targetField2, "ID2_2", aggName, 8.4, aggName2, "222.33F"), + asMap(targetField, "ID2", targetField2, "ID1_2", aggName, 28.99, aggName2, "-2.44F"), + asMap(targetField, "ID3", targetField2, "ID2_2", aggName, 12.55, aggName2, null) + ); Map fieldTypeMap = asStringMap( - aggName, "double", - aggName2, "keyword", // If the second aggregation was some non-numeric mapped field - targetField, "keyword", - targetField2, "keyword" + aggName, + "double", + aggName2, + "keyword", // If the second aggregation was some non-numeric mapped field + targetField, + "keyword", + targetField2, + "keyword" ); executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10); } @@ -438,16 +351,22 @@ public class AggregationResultUtilsTests extends ESTestCase { String targetField = randomAlphaOfLengthBetween(5, 10); String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - GroupConfig groupBy = parseGroupConfig("{" - + "\"" + targetField + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }," - + "\"" + targetField2 + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }" - + "}"); + GroupConfig groupBy = parseGroupConfig( + "{" + + "\"" + + targetField + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }," + + "\"" + + targetField2 + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }" + + "}" + ); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "scripted_metric#" + aggName; @@ -458,65 +377,40 @@ public class AggregationResultUtilsTests extends ESTestCase { "buckets", asList( asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", asMap("field", 123.0)), - DOC_COUNT, 1), + KEY, + asMap(targetField, "ID1", targetField2, "ID1_2"), + aggTypedName, + asMap("value", asMap("field", 123.0)), + DOC_COUNT, + 1 + ), asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", asMap("field", 1.0)), - DOC_COUNT, 2), + KEY, + asMap(targetField, "ID1", targetField2, "ID2_2"), + aggTypedName, + asMap("value", asMap("field", 1.0)), + DOC_COUNT, + 2 + ), asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", asMap("field", 2.13)), - DOC_COUNT, 3), - asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", null), - DOC_COUNT, 0) - )); - - List> expected = asList( - asMap( - targetField, "ID1", - targetField2, "ID1_2", - aggName, asMap("field", 123.0) - ), - asMap( - targetField, "ID1", - targetField2, "ID2_2", - aggName, asMap("field", 1.0) - ), - asMap( - targetField, "ID2", - targetField2, "ID1_2", - aggName, asMap("field", 2.13) - ), - asMap( - targetField, "ID3", - targetField2, "ID2_2", - aggName, null + KEY, + asMap(targetField, "ID2", targetField2, "ID1_2"), + aggTypedName, + asMap("value", asMap("field", 2.13)), + DOC_COUNT, + 3 + ), + asMap(KEY, asMap(targetField, "ID3", targetField2, "ID2_2"), aggTypedName, asMap("value", null), DOC_COUNT, 0) ) ); - Map fieldTypeMap = asStringMap( - targetField, "keyword", - targetField2, "keyword" + + List> expected = asList( + asMap(targetField, "ID1", targetField2, "ID1_2", aggName, asMap("field", 123.0)), + asMap(targetField, "ID1", targetField2, "ID2_2", aggName, asMap("field", 1.0)), + asMap(targetField, "ID2", targetField2, "ID1_2", aggName, asMap("field", 2.13)), + asMap(targetField, "ID3", targetField2, "ID2_2", aggName, null) ); + Map fieldTypeMap = asStringMap(targetField, "keyword", targetField2, "keyword"); executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 6); } @@ -524,16 +418,22 @@ public class AggregationResultUtilsTests extends ESTestCase { String targetField = randomAlphaOfLengthBetween(5, 10); String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - GroupConfig groupBy = parseGroupConfig("{" - + "\"" + targetField + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }," - + "\"" + targetField2 + "\" : {" - + " \"terms\" : {" - + " \"field\" : \"doesn't_matter_for_this_test\"" - + " } }" - + "}"); + GroupConfig groupBy = parseGroupConfig( + "{" + + "\"" + + targetField + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }," + + "\"" + + targetField2 + + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }" + + "}" + ); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "avg#" + aggName; @@ -541,87 +441,67 @@ public class AggregationResultUtilsTests extends ESTestCase { String pipelineAggTypedName = "bucket_script#" + pipelineAggName; Collection aggregationBuilders = asList(AggregationBuilders.scriptedMetric(aggName)); - Collection pipelineAggregationBuilders = - asList(PipelineAggregatorBuilders.bucketScript(pipelineAggName, + Collection pipelineAggregationBuilders = asList( + PipelineAggregatorBuilders.bucketScript( + pipelineAggName, Collections.singletonMap("param_1", aggName), - new Script("return params.param_1"))); + new Script("return params.param_1") + ) + ); Map input = asMap( "buckets", asList( asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 123.0), - pipelineAggTypedName, asMap( - "value", 123.0), - DOC_COUNT, 1), + KEY, + asMap(targetField, "ID1", targetField2, "ID1_2"), + aggTypedName, + asMap("value", 123.0), + pipelineAggTypedName, + asMap("value", 123.0), + DOC_COUNT, + 1 + ), asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 1.0), - pipelineAggTypedName, asMap( - "value", 1.0), - DOC_COUNT, 2), + KEY, + asMap(targetField, "ID1", targetField2, "ID2_2"), + aggTypedName, + asMap("value", 1.0), + pipelineAggTypedName, + asMap("value", 1.0), + DOC_COUNT, + 2 + ), asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 2.13), - pipelineAggTypedName, asMap( - "value", 2.13), - DOC_COUNT, 3), + KEY, + asMap(targetField, "ID2", targetField2, "ID1_2"), + aggTypedName, + asMap("value", 2.13), + pipelineAggTypedName, + asMap("value", 2.13), + DOC_COUNT, + 3 + ), asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 12.0), - pipelineAggTypedName, asMap( - "value", Double.NaN), - DOC_COUNT, 4) - )); - - List> expected = asList( - asMap( - targetField, "ID1", - targetField2, "ID1_2", - aggName, 123.0, - pipelineAggName, 123.0 - ), - asMap( - targetField, "ID1", - targetField2, "ID2_2", - aggName, 1.0, - pipelineAggName, 1.0 - ), - asMap( - targetField, "ID2", - targetField2, "ID1_2", - aggName, 2.13, - pipelineAggName, 2.13 - ), - asMap( - targetField, "ID3", - targetField2, "ID2_2", - aggName, 12.0, - pipelineAggName, null + KEY, + asMap(targetField, "ID3", targetField2, "ID2_2"), + aggTypedName, + asMap("value", 12.0), + pipelineAggTypedName, + asMap("value", Double.NaN), + DOC_COUNT, + 4 + ) ) ); - Map fieldTypeMap = asStringMap( - targetField, "keyword", - targetField2, "keyword", - aggName, "double" + + List> expected = asList( + asMap(targetField, "ID1", targetField2, "ID1_2", aggName, 123.0, pipelineAggName, 123.0), + asMap(targetField, "ID1", targetField2, "ID2_2", aggName, 1.0, pipelineAggName, 1.0), + asMap(targetField, "ID2", targetField2, "ID1_2", aggName, 2.13, pipelineAggName, 2.13), + asMap(targetField, "ID3", targetField2, "ID2_2", aggName, 12.0, pipelineAggName, null) ); + Map fieldTypeMap = asStringMap(targetField, "keyword", targetField2, "keyword", aggName, "double"); executeTest(groupBy, aggregationBuilders, pipelineAggregationBuilders, input, fieldTypeMap, expected, 10); } @@ -629,120 +509,76 @@ public class AggregationResultUtilsTests extends ESTestCase { String targetField = randomAlphaOfLengthBetween(5, 10); String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; - GroupConfig groupBy = parseGroupConfig("{" - + "\"" + targetField + "\" : {" + GroupConfig groupBy = parseGroupConfig( + "{" + + "\"" + + targetField + + "\" : {" + " \"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + " } }," - + "\"" + targetField2 + "\" : {" + + "\"" + + targetField2 + + "\" : {" + " \"terms\" : {" + " \"field\" : \"doesn't_matter_for_this_test\"" + " } }" - + "}"); + + "}" + ); String aggName = randomAlphaOfLengthBetween(5, 10); String aggTypedName = "avg#" + aggName; Collection aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName)); Map inputFirstRun = asMap( - "buckets", - asList( - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 42.33), - DOC_COUNT, 1), - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 8.4), - DOC_COUNT, 2), - asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 28.99), - DOC_COUNT, 3), - asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 12.55), - DOC_COUNT, 4) - )); + "buckets", + asList( + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID1_2"), aggTypedName, asMap("value", 42.33), DOC_COUNT, 1), + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID2_2"), aggTypedName, asMap("value", 8.4), DOC_COUNT, 2), + asMap(KEY, asMap(targetField, "ID2", targetField2, "ID1_2"), aggTypedName, asMap("value", 28.99), DOC_COUNT, 3), + asMap(KEY, asMap(targetField, "ID3", targetField2, "ID2_2"), aggTypedName, asMap("value", 12.55), DOC_COUNT, 4) + ) + ); Map inputSecondRun = asMap( - "buckets", - asList( - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 433.33), - DOC_COUNT, 12), - asMap( - KEY, asMap( - targetField, "ID1", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 83.4), - DOC_COUNT, 32), - asMap( - KEY, asMap( - targetField, "ID2", - targetField2, "ID1_2" - ), - aggTypedName, asMap( - "value", 21.99), - DOC_COUNT, 2), - asMap( - KEY, asMap( - targetField, "ID3", - targetField2, "ID2_2" - ), - aggTypedName, asMap( - "value", 122.55), - DOC_COUNT, 44) - )); + "buckets", + asList( + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID1_2"), aggTypedName, asMap("value", 433.33), DOC_COUNT, 12), + asMap(KEY, asMap(targetField, "ID1", targetField2, "ID2_2"), aggTypedName, asMap("value", 83.4), DOC_COUNT, 32), + asMap(KEY, asMap(targetField, "ID2", targetField2, "ID1_2"), aggTypedName, asMap("value", 21.99), DOC_COUNT, 2), + asMap(KEY, asMap(targetField, "ID3", targetField2, "ID2_2"), aggTypedName, asMap("value", 122.55), DOC_COUNT, 44) + ) + ); TransformIndexerStats stats = new TransformIndexerStats(); - Map fieldTypeMap = asStringMap( - aggName, "double", - targetField, "keyword", - targetField2, "keyword" - ); + Map fieldTypeMap = asStringMap(aggName, "double", targetField, "keyword", targetField2, "keyword"); - List> resultFirstRun = - runExtraction(groupBy, aggregationBuilders, Collections.emptyList(), inputFirstRun, fieldTypeMap, stats); - List> resultSecondRun = - runExtraction(groupBy, aggregationBuilders, Collections.emptyList(), inputSecondRun, fieldTypeMap, stats); + List> resultFirstRun = runExtraction( + groupBy, + aggregationBuilders, + Collections.emptyList(), + inputFirstRun, + fieldTypeMap, + stats + ); + List> resultSecondRun = runExtraction( + groupBy, + aggregationBuilders, + Collections.emptyList(), + inputSecondRun, + fieldTypeMap, + stats + ); assertNotEquals(resultFirstRun, resultSecondRun); Set documentIdsFirstRun = new HashSet<>(); - resultFirstRun.forEach(m -> { - documentIdsFirstRun.add((String) m.get(TransformField.DOCUMENT_ID_FIELD)); - }); + resultFirstRun.forEach(m -> { documentIdsFirstRun.add((String) m.get(TransformField.DOCUMENT_ID_FIELD)); }); assertEquals(4, documentIdsFirstRun.size()); Set documentIdsSecondRun = new HashSet<>(); - resultSecondRun.forEach(m -> { - documentIdsSecondRun.add((String) m.get(TransformField.DOCUMENT_ID_FIELD)); - }); + resultSecondRun.forEach(m -> { documentIdsSecondRun.add((String) m.get(TransformField.DOCUMENT_ID_FIELD)); }); assertEquals(4, documentIdsSecondRun.size()); assertEquals(documentIdsFirstRun, documentIdsSecondRun); @@ -759,12 +595,12 @@ public class AggregationResultUtilsTests extends ESTestCase { assertThat(document.get("metric"), equalTo(10L)); - Map bar = (Map)document.get("bar"); + Map bar = (Map) document.get("bar"); assertThat(bar.get("field1"), equalTo(1L)); - Map foo = (Map)document.get("foo"); - Map foobar = (Map)foo.get("bar"); + Map foo = (Map) document.get("foo"); + Map foobar = (Map) foo.get("bar"); assertThat(foobar.get("baz"), equalTo(1000L)); assertThat(foobar.get("baz2"), equalTo(2000L)); @@ -774,46 +610,50 @@ public class AggregationResultUtilsTests extends ESTestCase { Map document = new HashMap<>(); AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L); - AggregationResultUtils.AggregationExtractionException exception = - expectThrows(AggregationResultUtils.AggregationExtractionException.class, - () -> AggregationResultUtils.updateDocument(document, "foo.bar.baz", 2000L)); - assertThat(exception.getMessage(), - equalTo("duplicate key value pairs key [foo.bar.baz] old value [1000] duplicate value [2000]")); + AggregationResultUtils.AggregationExtractionException exception = expectThrows( + AggregationResultUtils.AggregationExtractionException.class, + () -> AggregationResultUtils.updateDocument(document, "foo.bar.baz", 2000L) + ); + assertThat(exception.getMessage(), equalTo("duplicate key value pairs key [foo.bar.baz] old value [1000] duplicate value [2000]")); } public void testUpdateDocumentWithObjectAndNotObject() { Map document = new HashMap<>(); AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L); - AggregationResultUtils.AggregationExtractionException exception = - expectThrows(AggregationResultUtils.AggregationExtractionException.class, - () -> AggregationResultUtils.updateDocument(document, "foo.bar", 2000L)); - assertThat(exception.getMessage(), - equalTo("mixed object types of nested and non-nested fields [foo.bar]")); + AggregationResultUtils.AggregationExtractionException exception = expectThrows( + AggregationResultUtils.AggregationExtractionException.class, + () -> AggregationResultUtils.updateDocument(document, "foo.bar", 2000L) + ); + assertThat(exception.getMessage(), equalTo("mixed object types of nested and non-nested fields [foo.bar]")); } - private NumericMetricsAggregation.SingleValue createSingleMetricAgg(Double value, String valueAsString) { + public static NumericMetricsAggregation.SingleValue createSingleMetricAgg(String name, Double value, String valueAsString) { NumericMetricsAggregation.SingleValue agg = mock(NumericMetricsAggregation.SingleValue.class); when(agg.value()).thenReturn(value); when(agg.getValueAsString()).thenReturn(valueAsString); + when(agg.getName()).thenReturn(name); return agg; } public void testSingleValueAggExtractor() { - Aggregation agg = createSingleMetricAgg(Double.NaN, "NaN"); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "double"), is(nullValue())); + Aggregation agg = createSingleMetricAgg("metric", Double.NaN, "NaN"); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "double"), ""), is(nullValue())); - agg = createSingleMetricAgg(Double.POSITIVE_INFINITY, "NaN"); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "double"), is(nullValue())); + agg = createSingleMetricAgg("metric", Double.POSITIVE_INFINITY, "NaN"); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "double"), ""), is(nullValue())); - agg = createSingleMetricAgg(100.0, "100.0"); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "double"), equalTo(100.0)); + agg = createSingleMetricAgg("metric", 100.0, "100.0"); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "double"), ""), equalTo(100.0)); - agg = createSingleMetricAgg(100.0, "one_hundred"); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "double"), equalTo(100.0)); + agg = createSingleMetricAgg("metric", 100.0, "one_hundred"); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "double"), ""), equalTo(100.0)); - agg = createSingleMetricAgg(100.0, "one_hundred"); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "string"), equalTo("one_hundred")); + agg = createSingleMetricAgg("metric", 100.0, "one_hundred"); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "string"), ""), + equalTo("one_hundred") + ); } private ScriptedMetric createScriptedMetric(Object returnValue) { @@ -825,15 +665,15 @@ public class AggregationResultUtilsTests extends ESTestCase { @SuppressWarnings("unchecked") public void testScriptedMetricAggExtractor() { Aggregation agg = createScriptedMetric(null); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "object"), is(nullValue())); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), is(nullValue())); agg = createScriptedMetric(Collections.singletonList("values")); - Object val = AggregationResultUtils.getExtractor(agg).value(agg, "object"); - assertThat((List)val, hasItem("values")); + Object val = AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""); + assertThat((List) val, hasItem("values")); agg = createScriptedMetric(Collections.singletonMap("key", 100)); - val = AggregationResultUtils.getExtractor(agg).value(agg, "object"); - assertThat(((Map)val).get("key"), equalTo(100)); + val = AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""); + assertThat(((Map) val).get("key"), equalTo(100)); } private GeoCentroid createGeoCentroid(GeoPoint point, long count) { @@ -845,13 +685,13 @@ public class AggregationResultUtilsTests extends ESTestCase { public void testGeoCentroidAggExtractor() { Aggregation agg = createGeoCentroid(null, 0); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_point"), is(nullValue())); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), is(nullValue())); agg = createGeoCentroid(new GeoPoint(100.0, 101.0), 0); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_point"), is(nullValue())); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), is(nullValue())); agg = createGeoCentroid(new GeoPoint(100.0, 101.0), randomIntBetween(1, 100)); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_point"), equalTo("100.0, 101.0")); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo("100.0, 101.0")); } private GeoBounds createGeoBounds(GeoPoint tl, GeoPoint br) { @@ -865,10 +705,10 @@ public class AggregationResultUtilsTests extends ESTestCase { public void testGeoBoundsAggExtractor() { final int numberOfRuns = 25; Aggregation agg = createGeoBounds(null, new GeoPoint(100.0, 101.0)); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_shape"), is(nullValue())); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), is(nullValue())); agg = createGeoBounds(new GeoPoint(100.0, 101.0), null); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_shape"), is(nullValue())); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), is(nullValue())); String type = "point"; for (int i = 0; i < numberOfRuns; i++) { @@ -878,7 +718,7 @@ public class AggregationResultUtilsTests extends ESTestCase { double lon = randomDoubleBetween(-180.0, 180.0, false); expectedObject.put("coordinates", Arrays.asList(lon, lat)); agg = createGeoBounds(new GeoPoint(lat, lon), new GeoPoint(lat, lon)); - assertThat(AggregationResultUtils.getExtractor(agg).value(agg, "geo_shape"), equalTo(expectedObject)); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo(expectedObject)); } type = "linestring"; @@ -893,11 +733,11 @@ public class AggregationResultUtilsTests extends ESTestCase { lon2 = randomDoubleBetween(-180.0, 180.0, false); } agg = createGeoBounds(new GeoPoint(lat, lon), new GeoPoint(lat2, lon2)); - Object val = AggregationResultUtils.getExtractor(agg).value(agg, "geo_shape"); - Map geoJson = (Map)val; + Object val = AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""); + Map geoJson = (Map) val; assertThat(geoJson.get("type"), equalTo(type)); - List coordinates = (List)geoJson.get("coordinates"); - for(Double[] coor : coordinates) { + List coordinates = (List) geoJson.get("coordinates"); + for (Double[] coor : coordinates) { assertThat(coor.length, equalTo(2)); } assertThat(coordinates.get(0)[0], equalTo(lon)); @@ -917,10 +757,10 @@ public class AggregationResultUtilsTests extends ESTestCase { lon2 = randomDoubleBetween(-180.0, 180.0, false); } agg = createGeoBounds(new GeoPoint(lat, lon), new GeoPoint(lat2, lon2)); - Object val = AggregationResultUtils.getExtractor(agg).value(agg, "geo_shape"); - Map geoJson = (Map)val; + Object val = AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""); + Map geoJson = (Map) val; assertThat(geoJson.get("type"), equalTo(type)); - List> coordinates = (List>)geoJson.get("coordinates"); + List> coordinates = (List>) geoJson.get("coordinates"); assertThat(coordinates.size(), equalTo(1)); assertThat(coordinates.get(0).size(), equalTo(5)); List> expected = Arrays.asList( @@ -928,8 +768,9 @@ public class AggregationResultUtilsTests extends ESTestCase { Arrays.asList(lon2, lat), Arrays.asList(lon2, lat2), Arrays.asList(lon, lat2), - Arrays.asList(lon, lat)); - for(int j = 0; j < 5; j++) { + Arrays.asList(lon, lat) + ); + for (int j = 0; j < 5; j++) { Double[] coordinate = coordinates.get(0).get(j); assertThat(coordinate.length, equalTo(2)); assertThat(coordinate[0], equalTo(expected.get(j).get(0))); @@ -938,29 +779,112 @@ public class AggregationResultUtilsTests extends ESTestCase { } } - private void executeTest(GroupConfig groups, - Collection aggregationBuilders, - Collection pipelineAggregationBuilders, - Map input, - Map fieldTypeMap, - List> expected, - long expectedDocCounts) throws IOException { + public static Percentiles createPercentilesAgg(String name, List percentiles) { + Percentiles agg = mock(Percentiles.class); + + when(agg.iterator()).thenReturn(percentiles.iterator()); + when(agg.getName()).thenReturn(name); + return agg; + } + + public void testPercentilesAggExtractor() { + Aggregation agg = createPercentilesAgg( + "p_agg", + Arrays.asList(new Percentile(1, 0), new Percentile(50, 22.2), new Percentile(99, 43.3), new Percentile(99.5, 100.3)) + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), + equalTo(asMap("1", 0.0, "50", 22.2, "99", 43.3, "99_5", 100.3)) + ); + } + + public static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, Aggregation... subAggregations) { + SingleBucketAggregation agg = mock(SingleBucketAggregation.class); + when(agg.getDocCount()).thenReturn(docCount); + when(agg.getName()).thenReturn(name); + if (subAggregations != null) { + org.elasticsearch.search.aggregations.Aggregations subAggs = new org.elasticsearch.search.aggregations.Aggregations( + Arrays.asList(subAggregations) + ); + when(agg.getAggregations()).thenReturn(subAggs); + } else { + when(agg.getAggregations()).thenReturn(null); + } + return agg; + } + + public void testSingleBucketAggExtractor() { + Aggregation agg = createSingleBucketAgg("sba", 42L); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo(42L)); + + agg = createSingleBucketAgg("sba1", 42L, createSingleMetricAgg("sub1", 100.0, "100.0")); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), + equalTo(Collections.singletonMap("sub1", 100.0)) + ); + + agg = createSingleBucketAgg( + "sba2", + 42L, + createSingleMetricAgg("sub1", 100.0, "hundred"), + createSingleMetricAgg("sub2", 33.33, "thirty_three") + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba2.sub1", "long", "sba2.sub2", "float"), ""), + equalTo(asMap("sub1", 100.0, "sub2", 33.33)) + ); + + agg = createSingleBucketAgg( + "sba3", + 42L, + createSingleMetricAgg("sub1", 100.0, "hundred"), + createSingleMetricAgg("sub2", 33.33, "thirty_three"), + createSingleBucketAgg("sub3", 42L) + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba3.sub1", "long", "sba3.sub2", "double"), ""), + equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", 42L)) + ); + + agg = createSingleBucketAgg( + "sba4", + 42L, + createSingleMetricAgg("sub1", 100.0, "hundred"), + createSingleMetricAgg("sub2", 33.33, "thirty_three"), + createSingleBucketAgg("sub3", 42L, createSingleMetricAgg("subsub1", 11.1, "eleven_dot_eleven")) + ); + assertThat( + AggregationResultUtils.getExtractor(agg) + .value(agg, asStringMap("sba4.sub3.subsub1", "double", "sba4.sub2", "float", "sba4.sub1", "long"), ""), + equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", asMap("subsub1", 11.1))) + ); + } + + private void executeTest( + GroupConfig groups, + Collection aggregationBuilders, + Collection pipelineAggregationBuilders, + Map input, + Map fieldTypeMap, + List> expected, + long expectedDocCounts + ) throws IOException { TransformIndexerStats stats = new TransformIndexerStats(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); - List> result = runExtraction(groups, + List> result = runExtraction( + groups, aggregationBuilders, pipelineAggregationBuilders, input, fieldTypeMap, - stats); + stats + ); // remove the document ids and test uniqueness Set documentIds = new HashSet<>(); - result.forEach(m -> { - documentIds.add((String) m.remove(TransformField.DOCUMENT_ID_FIELD)); - }); + result.forEach(m -> { documentIds.add((String) m.remove(TransformField.DOCUMENT_ID_FIELD)); }); assertEquals(result.size(), documentIds.size()); assertEquals(expected, result); @@ -968,30 +892,34 @@ public class AggregationResultUtilsTests extends ESTestCase { } - private List> runExtraction(GroupConfig groups, - Collection aggregationBuilders, - Collection pipelineAggregationBuilders, - Map input, - Map fieldTypeMap, - TransformIndexerStats stats) throws IOException { + private List> runExtraction( + GroupConfig groups, + Collection aggregationBuilders, + Collection pipelineAggregationBuilders, + Map input, + Map fieldTypeMap, + TransformIndexerStats stats + ) throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); try (XContentParser parser = createParser(builder)) { CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); - return AggregationResultUtils.extractCompositeAggregationResults(agg, + return AggregationResultUtils.extractCompositeAggregationResults( + agg, groups, aggregationBuilders, pipelineAggregationBuilders, fieldTypeMap, - stats).collect(Collectors.toList()); + stats + ).collect(Collectors.toList()); } } private GroupConfig parseGroupConfig(String json) throws IOException { - final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(), - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); + final XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); return GroupConfig.fromXContent(parser, false); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java new file mode 100644 index 00000000000..1b5faa897fa --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationSchemaAndResultTests.java @@ -0,0 +1,287 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms.pivot; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.metrics.Percentile; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AggregationSchemaAndResultTests extends ESTestCase { + + private Client client; + + @Before + public void setupClient() { + if (client != null) { + client.close(); + } + client = new MyMockClient(getTestName()); + } + + @After + public void tearDownClient() { + client.close(); + } + + private class MyMockClient extends NoOpClient { + + MyMockClient(String testName) { + super(testName); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + + if (request instanceof FieldCapabilitiesRequest) { + FieldCapabilitiesRequest fieldCapsRequest = (FieldCapabilitiesRequest) request; + + Map> fieldCaps = new HashMap<>(); + for (String field : fieldCapsRequest.fields()) { + + // expect a field name like "double_field" where type is a prefix + String[] nameTypePair = Strings.split(field, "_"); + String type = nameTypePair != null ? nameTypePair[0] : "long"; + + fieldCaps.put( + field, + Collections.singletonMap( + type, + new FieldCapabilities(field, type, true, true, null, null, null, Collections.emptyMap()) + ) + ); + } + + // FieldCapabilitiesResponse is package private, thats why we use a mock + FieldCapabilitiesResponse response = mock(FieldCapabilitiesResponse.class); + when(response.get()).thenReturn(fieldCaps); + + for (String field : fieldCaps.keySet()) { + when(response.getField(field)).thenReturn(fieldCaps.get(field)); + } + + listener.onResponse((Response) response); + return; + } + + super.doExecute(action, request, listener); + } + } + + public void testBasic() throws InterruptedException { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + + // aggs which produce 1 output + aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("long_stars")); + aggs.addAggregator(AggregationBuilders.max("max_rating").field("long_stars")); + aggs.addAggregator(AggregationBuilders.count("count_rating").field("keyword_group")); + aggs.addAggregator(AggregationBuilders.min("min_something").field("float_something")); + + // percentile produces 1 output per percentile + 1 for the parent object + aggs.addAggregator(AggregationBuilders.percentiles("p_rating").field("long_stars").percentiles(1, 5, 10, 50, 99.9)); + + // scripted metric produces no output because its dynamic + aggs.addAggregator(AggregationBuilders.scriptedMetric("collapsed_ratings")); + + AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); + GroupConfig groupConfig = GroupConfigTests.randomGroupConfig(); + PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null); + + this.>assertAsync( + listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener), + mappings -> { + assertEquals(groupConfig.getGroups().size() + 10, mappings.size()); + assertEquals("long", mappings.get("max_rating")); + assertEquals("double", mappings.get("avg_rating")); + assertEquals("long", mappings.get("count_rating")); + assertEquals("float", mappings.get("min_something")); + assertEquals("object", mappings.get("p_rating")); + assertEquals("double", mappings.get("p_rating.1")); + assertEquals("double", mappings.get("p_rating.5")); + assertEquals("double", mappings.get("p_rating.10")); + assertEquals("double", mappings.get("p_rating.99_9")); + + Aggregation agg = AggregationResultUtilsTests.createSingleMetricAgg("avg_rating", 33.3, "33.3"); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(33.3)); + + agg = AggregationResultUtilsTests.createPercentilesAgg( + "p_agg", + Arrays.asList(new Percentile(1, 0), new Percentile(50, 1.2), new Percentile(99, 2.4), new Percentile(99.5, 4.3)) + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), + equalTo(asMap("1", 0.0, "50", 1.2, "99", 2.4, "99_5", 4.3)) + ); + } + ); + } + + public void testNested() throws InterruptedException { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.filter("filter_1", new TermQueryBuilder("favorite_drink", "slurm"))); + aggs.addAggregator( + AggregationBuilders.filter("filter_2", new TermQueryBuilder("species", "amphibiosan")) + .subAggregation(AggregationBuilders.max("max_drinks_2").field("long_drinks")) + ); + aggs.addAggregator( + AggregationBuilders.filter("filter_3", new TermQueryBuilder("spaceship", "nimbus")) + .subAggregation( + AggregationBuilders.filter("filter_3_1", new TermQueryBuilder("species", "amphibiosan")) + .subAggregation(AggregationBuilders.max("max_drinks_3").field("float_drinks")) + ) + ); + aggs.addAggregator( + AggregationBuilders.filter("filter_4", new TermQueryBuilder("organization", "doop")) + .subAggregation( + AggregationBuilders.filter("filter_4_1", new TermQueryBuilder("spaceship", "nimbus")) + .subAggregation( + AggregationBuilders.filter("filter_4_1_1", new TermQueryBuilder("species", "amphibiosan")) + .subAggregation(AggregationBuilders.max("max_drinks_4").field("float_drinks")) + ) + .subAggregation( + AggregationBuilders.filter("filter_4_1_2", new TermQueryBuilder("species", "mutant")) + .subAggregation(AggregationBuilders.max("min_drinks_4").field("double_drinks")) + ) + ) + ); + + AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); + GroupConfig groupConfig = GroupConfigTests.randomGroupConfig(); + PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null); + + this.>assertAsync( + listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener), + mappings -> { + assertEquals(groupConfig.getGroups().size() + 12, mappings.size()); + assertEquals("long", mappings.get("filter_1")); + assertEquals("object", mappings.get("filter_2")); + assertEquals("long", mappings.get("filter_2.max_drinks_2")); + assertEquals("object", mappings.get("filter_3")); + assertEquals("object", mappings.get("filter_3.filter_3_1")); + assertEquals("float", mappings.get("filter_3.filter_3_1.max_drinks_3")); + assertEquals("object", mappings.get("filter_4")); + assertEquals("object", mappings.get("filter_4.filter_4_1")); + assertEquals("object", mappings.get("filter_4.filter_4_1.filter_4_1_1")); + assertEquals("float", mappings.get("filter_4.filter_4_1.filter_4_1_1.max_drinks_4")); + assertEquals("object", mappings.get("filter_4.filter_4_1.filter_4_1_2")); + assertEquals("double", mappings.get("filter_4.filter_4_1.filter_4_1_2.min_drinks_4")); + + Aggregation agg = AggregationResultUtilsTests.createSingleBucketAgg("filter_1", 36363); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(36363L)); + + agg = AggregationResultUtilsTests.createSingleBucketAgg( + "filter_2", + 23144, + AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_2", 45.0, "forty_five") + ); + assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45.0))); + + agg = AggregationResultUtilsTests.createSingleBucketAgg( + "filter_3", + 62426, + AggregationResultUtilsTests.createSingleBucketAgg( + "filter_3_1", + 33365, + AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_3", 35.0, "thirty_five") + ) + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), + equalTo(asMap("filter_3_1", asMap("max_drinks_3", 35.0))) + ); + + agg = AggregationResultUtilsTests.createSingleBucketAgg( + "filter_4", + 62426, + AggregationResultUtilsTests.createSingleBucketAgg( + "filter_4_1", + 33365, + AggregationResultUtilsTests.createSingleBucketAgg( + "filter_4_1_1", + 12543, + AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_4", 1.0, "a small one") + ), + AggregationResultUtilsTests.createSingleBucketAgg( + "filter_4_1_2", + 526, + AggregationResultUtilsTests.createSingleMetricAgg("min_drinks_4", 7395.0, "a lot") + ) + ) + ); + assertThat( + AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), + equalTo( + asMap( + "filter_4_1", + asMap("filter_4_1_1", asMap("max_drinks_4", 1.0), "filter_4_1_2", asMap("min_drinks_4", 7395.0)) + ) + ) + ); + } + ); + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } + + static Map asMap(Object... fields) { + assert fields.length % 2 == 0; + final Map map = new HashMap<>(); + for (int i = 0; i < fields.length; i += 2) { + String field = (String) fields[i]; + map.put(field, fields[i + 1]); + } + return map; + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java index d12934cc026..a2afbddb412 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java @@ -6,16 +6,23 @@ package org.elasticsearch.xpack.transform.transforms.pivot; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; import org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.analytics.AnalyticsPlugin; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class AggregationsTests extends ESTestCase { @@ -77,6 +84,14 @@ public class AggregationsTests extends ESTestCase { // percentile assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null)); assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int")); + + // filter + assertEquals("long", Aggregations.resolveTargetMapping("filter", null)); + assertEquals("long", Aggregations.resolveTargetMapping("filter", "long")); + assertEquals("long", Aggregations.resolveTargetMapping("filter", "double")); + + // corner case: source type null + assertEquals(null, Aggregations.resolveTargetMapping("min", null)); } public void testAggregationsVsTransforms() { @@ -105,4 +120,101 @@ public class AggregationsTests extends ESTestCase { ); } } + + public void testGetAggregationOutputTypesPercentiles() { + AggregationBuilder percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles").percentiles(1.0, 5.0, 10.0); + + Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes( + percentialAggregationBuilder + ); + assertTrue(inputAndOutputTypes.v1().isEmpty()); + Map outputTypes = inputAndOutputTypes.v2(); + assertEquals(3, outputTypes.size()); + assertEquals("percentiles", outputTypes.get("percentiles.1")); + assertEquals("percentiles", outputTypes.get("percentiles.5")); + assertEquals("percentiles", outputTypes.get("percentiles.10")); + + // note: using the constructor, omits validation, in reality this test might fail + percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles").percentiles(1.0, 5.0, 5.0, 10.0); + + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(percentialAggregationBuilder); + assertTrue(inputAndOutputTypes.v1().isEmpty()); + outputTypes = inputAndOutputTypes.v2(); + + assertEquals(3, outputTypes.size()); + assertEquals("percentiles", outputTypes.get("percentiles.1")); + assertEquals("percentiles", outputTypes.get("percentiles.5")); + assertEquals("percentiles", outputTypes.get("percentiles.10")); + } + + public void testGetAggregationOutputTypesSubAggregations() { + + AggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter_1", new TermQueryBuilder("type", "cat")); + Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes( + filterAggregationBuilder + ); + assertTrue(inputAndOutputTypes.v1().isEmpty()); + + Map outputTypes = inputAndOutputTypes.v2(); + assertEquals(1, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1")); + + AggregationBuilder subFilterAggregationBuilder = new FilterAggregationBuilder("filter_2", new TermQueryBuilder("subtype", "siam")); + filterAggregationBuilder.subAggregation(subFilterAggregationBuilder); + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + assertTrue(inputAndOutputTypes.v1().isEmpty()); + + outputTypes = inputAndOutputTypes.v2(); + assertEquals(1, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1.filter_2")); + + filterAggregationBuilder.subAggregation(new MaxAggregationBuilder("max_2").field("max_field")); + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + assertEquals(1, inputAndOutputTypes.v1().size()); + Map inputTypes = inputAndOutputTypes.v1(); + assertEquals("max_field", inputTypes.get("filter_1.max_2")); + + outputTypes = inputAndOutputTypes.v2(); + assertEquals(2, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1.filter_2")); + assertEquals("max", outputTypes.get("filter_1.max_2")); + + subFilterAggregationBuilder.subAggregation(new FilterAggregationBuilder("filter_3", new TermQueryBuilder("color", "white"))); + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + assertEquals(1, inputAndOutputTypes.v1().size()); + + outputTypes = inputAndOutputTypes.v2(); + assertEquals(2, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3")); + assertEquals("max", outputTypes.get("filter_1.max_2")); + + subFilterAggregationBuilder.subAggregation(new MinAggregationBuilder("min_3").field("min_field")); + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + assertEquals(2, inputAndOutputTypes.v1().size()); + inputTypes = inputAndOutputTypes.v1(); + assertEquals("max_field", inputTypes.get("filter_1.max_2")); + assertEquals("min_field", inputTypes.get("filter_1.filter_2.min_3")); + + outputTypes = inputAndOutputTypes.v2(); + assertEquals(3, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3")); + assertEquals("max", outputTypes.get("filter_1.max_2")); + assertEquals("min", outputTypes.get("filter_1.filter_2.min_3")); + + subFilterAggregationBuilder.subAggregation( + new PercentilesAggregationBuilder("percentiles").percentiles(33.3, 44.4, 88.8, 99.5) + ); + inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + assertEquals(2, inputAndOutputTypes.v1().size()); + + outputTypes = inputAndOutputTypes.v2(); + assertEquals(7, outputTypes.size()); + assertEquals("filter", outputTypes.get("filter_1.filter_2.filter_3")); + assertEquals("max", outputTypes.get("filter_1.max_2")); + assertEquals("min", outputTypes.get("filter_1.filter_2.min_3")); + assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.33_3")); + assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.44_4")); + assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.88_8")); + assertEquals("percentiles", outputTypes.get("filter_1.filter_2.percentiles.99_5")); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 22a492bf7fd..c456633a24a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -215,8 +215,8 @@ public class PivotTests extends ESTestCase { "{\"pivot_scripted_metric\": {\n" + "\"scripted_metric\": {\n" + " \"init_script\" : \"state.transactions = []\",\n" - + " \"map_script\" : \"state.transactions.add(" - + "doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)\",\n" + + " \"map_script\" : " + + " \"state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)\", \n" + " \"combine_script\" : \"double profit = 0; for (t in state.transactions) { profit += t } return profit\",\n" + " \"reduce_script\" : \"double profit = 0; for (a in states) { profit += a } return profit\"\n" + " }\n" @@ -251,6 +251,12 @@ public class PivotTests extends ESTestCase { + "}" ); } + if (agg.equals(AggregationType.FILTER.getName())) { + return parseAggregations( + "{" + "\"pivot_filter\": {" + " \"filter\": {" + " \"term\": {\"field\": \"value\"}" + " }" + "}" + "}" + ); + } + return parseAggregations( "{\n" + " \"pivot_" + agg + "\": {\n" + " \"" + agg + "\": {\n" + " \"field\": \"values\"\n" + " }\n" + " }" + "}" );