[7.10][Transform] add support for unsigned_long data type (#63957)

add support for unsigned_long, which required a change in
writing out integer results properly, because coerce is not
supported for unsigned_long

fixes #63871
backport #63940
This commit is contained in:
Hendrik Muhs 2020-10-20 21:05:46 +02:00 committed by GitHub
parent 8d30766a7d
commit f2517678aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 105 additions and 39 deletions

View File

@ -554,15 +554,15 @@ public class TransformPivotRestIT extends TransformRestTestCase {
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
final StringBuilder bulk = new StringBuilder();
@ -606,15 +606,15 @@ public class TransformPivotRestIT extends TransformRestTestCase {
searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30));
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
}

View File

@ -130,8 +130,8 @@ public class DateHistogramGroupByIT extends ContinuousTestCase {
);
assertThat(
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
XContentMapValues.extractValue("count", source),
equalTo(Double.valueOf(bucket.getDocCount()))
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
equalTo(bucket.getDocCount())
);
// transform should only rewrite documents that require it
@ -146,9 +146,11 @@ public class DateHistogramGroupByIT extends ContinuousTestCase {
// we use a fixed_interval of `1s`, the transform runs every `1s` so it the bucket might be recalculated at the next run
// but
// should NOT be recalculated for the 2nd/3rd/... run
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
.extractValue(MAX_RUN_FIELD, source),
is(lessThanOrEqualTo(1.0))
(Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source) - (Integer) XContentMapValues.extractValue(
MAX_RUN_FIELD,
source
),
is(lessThanOrEqualTo(1))
);
}
}

View File

@ -60,7 +60,7 @@ public class TermsGroupByIT extends ContinuousTestCase {
AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
addCommonAggregations(aggregations);
aggregations.addAggregator(AggregationBuilders.max("metric.avg").field("metric"));
aggregations.addAggregator(AggregationBuilders.avg("metric.avg").field("metric"));
pivotConfigBuilder.setAggregations(aggregations);
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
@ -83,7 +83,7 @@ public class TermsGroupByIT extends ContinuousTestCase {
// missing_bucket produces `null`, we can't use `null` in aggs, so we have to use a magic value, see gh#60043
terms.missing(MISSING_BUCKET_KEY);
}
terms.subAggregation(AggregationBuilders.max("metric.avg").field("metric"));
terms.subAggregation(AggregationBuilders.avg("metric.avg").field("metric"));
sourceBuilderSource.aggregation(terms);
searchRequestSource.source(sourceBuilderSource);
SearchResponse responseSource = search(searchRequestSource);
@ -129,8 +129,8 @@ public class TermsGroupByIT extends ContinuousTestCase {
);
assertThat(
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
XContentMapValues.extractValue("count", source),
equalTo(Double.valueOf(bucket.getDocCount()))
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
equalTo(bucket.getDocCount())
);
SingleValue avgAgg = (SingleValue) bucket.getAggregations().get("metric.avg");
@ -154,8 +154,7 @@ public class TermsGroupByIT extends ContinuousTestCase {
+ iteration
+ " full source: "
+ source,
// TODO: aggs return double for MAX_RUN_FIELD, although it is an integer
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)),
XContentMapValues.extractValue(INGEST_RUN_FIELD, source),
equalTo(XContentMapValues.extractValue(MAX_RUN_FIELD, source))
);
}

View File

@ -310,7 +310,7 @@ public class TransformContinuousIT extends ESRestTestCase {
.field("type", "keyword")
.endObject()
.startObject("metric")
.field("type", "integer")
.field("type", randomFrom("integer", "long", "unsigned_long"))
.endObject()
.startObject("location")
.field("type", "geo_point")

View File

@ -40,6 +40,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt;
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.isNumericType;
public final class AggregationResultUtils {
@ -198,7 +199,7 @@ public final class AggregationResultUtils {
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
return aggregation.value();
return dropFloatingPointComponentIfTypeRequiresIt(fieldType, aggregation.value());
} else {
return aggregation.getValueAsString();
}

View File

@ -23,30 +23,59 @@ import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public final class SchemaUtil {
private static final Logger logger = LogManager.getLogger(SchemaUtil.class);
// Full collection of numeric field type strings
private static final Set<String> NUMERIC_FIELD_MAPPER_TYPES;
// Full collection of numeric field type strings and whether they are floating point or not
private static final Map<String, Boolean> NUMERIC_FIELD_MAPPER_TYPES;
static {
Set<String> types = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toSet());
types.add("scaled_float"); // have to add manually since scaled_float is in a module
Map<String, Boolean> types = Stream.of(NumberFieldMapper.NumberType.values())
.collect(Collectors.toMap(t -> t.typeName(), t -> t.numericType().isFloatingPoint()));
// have to add manually since they are in a module
types.put("scaled_float", true);
types.put("unsigned_long", false);
NUMERIC_FIELD_MAPPER_TYPES = types;
}
private SchemaUtil() {}
public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
return type != null && NUMERIC_FIELD_MAPPER_TYPES.containsKey(type);
}
/**
* Convert a numeric value to a whole number if it's not a floating point number.
*
* Implementation decision: We do not care about the concrete type, but only if its floating point or not.
* Further checks (e.g. range) are done at indexing.
*
* If type is floating point but ends with `.0`, we still preserve `.0` in case
* the destination index uses dynamic mappings as well as being json friendly.
*
* @param type the type of the value according to the schema we know
* @param value the value as double (aggs return double for everything)
* @return value if its floating point, long if value is smaller than Long.MAX_VALUE, BigInteger otherwise
*/
public static Object dropFloatingPointComponentIfTypeRequiresIt(String type, double value) {
if (NUMERIC_FIELD_MAPPER_TYPES.getOrDefault(type, true) == false) {
assert value % 1 == 0;
if (value < Long.MAX_VALUE) {
return (long) value;
}
// special case for unsigned long
return BigDecimal.valueOf(value).toBigInteger();
}
return value;
}
/**
@ -188,8 +217,11 @@ public final class SchemaUtil {
} else if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping);
} else {
logger.warn("Failed to deduce mapping for [{}], fall back to dynamic mapping. " +
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
logger.warn(
"Failed to deduce mapping for [{}], fall back to dynamic mapping. "
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
targetFieldName
);
}
});
@ -199,8 +231,11 @@ public final class SchemaUtil {
if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping);
} else {
logger.warn("Failed to deduce mapping for [{}], fall back to keyword. " +
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
logger.warn(
"Failed to deduce mapping for [{}], fall back to keyword. "
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
targetFieldName
);
targetMapping.put(targetFieldName, KeywordFieldMapper.CONTENT_TYPE);
}
});

View File

@ -654,6 +654,12 @@ public class AggregationResultUtilsTests extends ESTestCase {
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "string"), ""),
equalTo("one_hundred")
);
agg = createSingleMetricAgg("metric", 100.0, "one_hundred");
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "unsigned_long"), ""),
equalTo(100L)
);
}
private ScriptedMetric createScriptedMetric(Object returnValue) {
@ -836,7 +842,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba2.sub1", "long", "sba2.sub2", "float"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33))
equalTo(asMap("sub1", 100L, "sub2", 33.33))
);
agg = createSingleBucketAgg(
@ -848,7 +854,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba3.sub1", "long", "sba3.sub2", "double"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", 42L))
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", 42L))
);
agg = createSingleBucketAgg(
@ -861,7 +867,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
assertThat(
AggregationResultUtils.getExtractor(agg)
.value(agg, asStringMap("sba4.sub3.subsub1", "double", "sba4.sub2", "float", "sba4.sub1", "long"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
);
}

View File

@ -127,8 +127,11 @@ public class AggregationSchemaAndResultTests extends ESTestCase {
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
long numGroupsWithoutScripts = groupConfig.getGroups()
.values()
.stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
.count();
this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
@ -191,8 +194,11 @@ public class AggregationSchemaAndResultTests extends ESTestCase {
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
long numGroupsWithoutScripts = groupConfig.getGroups()
.values()
.stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
.count();
this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
@ -219,7 +225,7 @@ public class AggregationSchemaAndResultTests extends ESTestCase {
23144,
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_2", 45.0, "forty_five")
);
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45.0)));
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45L)));
agg = AggregationResultUtilsTests.createSingleBucketAgg(
"filter_3",

View File

@ -8,9 +8,12 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
import org.elasticsearch.test.ESTestCase;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
public class SchemaUtilTests extends ESTestCase {
public void testInsertNestedObjectMappings() {
@ -52,4 +55,18 @@ public class SchemaUtilTests extends ESTestCase {
assertFalse(fieldMappings.containsKey(""));
}
public void testConvertToIntegerTypeIfNeeded() {
assertEquals(33L, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unsigned_long", 33.0));
assertEquals(33L, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("long", 33.0));
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("double", 33.0));
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("half_float", 33.0));
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unknown", 33.0));
assertEquals(33.0, SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt(null, 33.0));
Object value = SchemaUtil.dropFloatingPointComponentIfTypeRequiresIt("unsigned_long", 1.8446744073709551615E19);
assertThat(value, instanceOf(BigInteger.class));
assertEquals(new BigInteger("18446744073709551615").doubleValue(), ((BigInteger) value).doubleValue(), 0.0);
}
}