[7.x] [Transform] add support for terms agg in transforms (#56696) (#56809)

* [Transform] add support for terms agg in transforms (#56696)

This adds support for `terms` and `rare_terms` aggs in transforms. 

The default behavior is that the results are collapsed in the following manner:
`<AGG_NAME>.<BUCKET_NAME>.<SUBAGGS...>...`
Or if no sub aggs exist
`<AGG_NAME>.<BUCKET_NAME>.<_doc_count>`

The mapping is also defined as `flattened` by default. This is to avoid field explosion while still providing (limited) search and aggregation capabilities.
This commit is contained in:
Benjamin Trent 2020-05-15 08:08:43 -04:00 committed by GitHub
parent 270a23e422
commit f71c305090
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 146 additions and 7 deletions

View File

@ -666,6 +666,8 @@ are supported:
* <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
* <<search-aggregations-metrics-cardinality-aggregation,Cardinality>>
* <<search-aggregations-bucket-filter-aggregation,Filter>>
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
* <<search-aggregations-bucket-terms-aggregation, Terms>>
* <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
* <<search-aggregations-metrics-max-aggregation,Max>>

View File

@ -265,12 +265,12 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}
}
}
}
- do:
catch: /Unsupported aggregation type \[terms\]/
catch: /Unsupported aggregation type \[significant_terms\]/
transform.preview_transform:
body: >
{
@ -280,7 +280,7 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}
}
}
}

View File

@ -18,6 +18,8 @@ import org.junit.Before;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -25,11 +27,11 @@ import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -508,6 +510,97 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertDateHistogramPivot(REVIEWS_DATE_NANO_INDEX_NAME);
}
@SuppressWarnings("unchecked")
public void testPivotWithTermsAgg() throws Exception {
String transformId = "simple_terms_agg_pivot";
String transformIndex = "pivot_reviews_via_histogram_with_terms_agg";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"every_2\": {"
+ " \"histogram\": {"
+ " \"interval\": 2,\"field\":\"stars\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"common_users\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\","
+ " \"size\": 2"
+ " },"
+ " \"aggs\" : {"
+ " \"common_businesses\": {"
+ " \"terms\": {"
+ " \"field\": \"business_id\","
+ " \"size\": 2"
+ " }}"
+ " } "
+" },"
+ " \"rare_users\": {"
+ " \"rare_terms\": {"
+ " \"field\": \"user_id\""
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
// get and check some term results
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Map<String, Integer> commonUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.common_users",
searchResult
)).get(0);
Map<String, Integer> rareUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.rare_users",
searchResult
)).get(0);
assertThat(commonUsers, is(not(nullValue())));
assertThat(commonUsers, equalTo(new HashMap<String, Object>(){{
put("user_10",
Collections.singletonMap(
"common_businesses",
new HashMap<String, Object>(){{
put("business_12", 6);
put("business_9", 4);
}}));
put("user_0", Collections.singletonMap(
"common_businesses",
new HashMap<String, Object>(){{
put("business_0", 35);
}}));
}}));
assertThat(rareUsers, is(not(nullValue())));
assertThat(rareUsers, equalTo(new HashMap<String, Object>(){{
put("user_5", 1);
put("user_12", 1);
}}));
}
private void assertDateHistogramPivot(String indexName) throws Exception {
String transformId = "simple_date_histogram_pivot_" + indexName;
String transformIndex = "pivot_reviews_via_date_histogram_" + indexName;

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.geo.parsers.ShapeParser;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
@ -52,6 +53,7 @@ public final class AggregationResultUtils {
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}
@ -120,6 +122,8 @@ public final class AggregationResultUtils {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else if (aggregation instanceof SingleBucketAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else if (aggregation instanceof MultiBucketsAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
@ -246,6 +250,35 @@ public final class AggregationResultUtils {
}
}
static class MultiBucketsAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
MultiBucketsAggregation aggregation = (MultiBucketsAggregation) agg;
HashMap<String, Object> nested = new HashMap<>();
for (MultiBucketsAggregation.Bucket bucket : aggregation.getBuckets()) {
if (bucket.getAggregations().iterator().hasNext() == false) {
nested.put(bucket.getKeyAsString(), bucket.getDocCount());
} else {
HashMap<String, Object> nestedBucketObject = new HashMap<>();
for (Aggregation subAgg : bucket.getAggregations()) {
nestedBucketObject.put(
subAgg.getName(),
getExtractor(subAgg).value(
subAgg,
fieldTypeMap,
lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
)
);
}
nested.put(bucket.getKeyAsString(), nestedBucketObject);
}
}
return nested;
}
}
static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {

View File

@ -31,6 +31,7 @@ public final class Aggregations {
private static final String SOURCE = "_source";
public static final String FLOAT = "float";
public static final String FLATTENED = "flattened";
public static final String SCALED_FLOAT = "scaled_float";
public static final String DOUBLE = "double";
public static final String LONG = "long";
@ -69,14 +70,12 @@ public final class Aggregations {
"nested",
"percentile_ranks",
"range",
"rare_terms",
"reverse_nested",
"sampler",
"significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
"significant_text",
"stats", // https://github.com/elastic/elasticsearch/issues/51925
"string_stats", // https://github.com/elastic/elasticsearch/issues/51925
"terms", // https://github.com/elastic/elasticsearch/issues/51073
"top_hits",
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
"t_test" // https://github.com/elastic/elasticsearch/issues/54503
@ -107,7 +106,9 @@ public final class Aggregations {
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", DOUBLE),
FILTER("filter", LONG);
FILTER("filter", LONG),
TERMS("terms", FLATTENED),
RARE_TERMS("rare_terms", FLATTENED);
private final String aggregationType;
private final String targetMapping;

View File

@ -90,6 +90,16 @@ public class AggregationsTests extends ESTestCase {
assertEquals("long", Aggregations.resolveTargetMapping("filter", "long"));
assertEquals("long", Aggregations.resolveTargetMapping("filter", "double"));
// terms
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "keyword"));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "text"));
// rare_terms
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "text"));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "keyword"));
// corner case: source type null
assertEquals(null, Aggregations.resolveTargetMapping("min", null));
}