SQL: Fix metric aggs on date/time to not return double (#40377)

Previously metric aggregations on date fields would return a double
which caused errors when trying to apply scalar functions on top, e.g.:
```
SELECT YEAR(MAX(date)) FROM test
```

Fixes: #40376

(cherry-picked from commit 41d0a038467fbdbbf67fd9bfdf27623451cae63a)
This commit is contained in:
Marios Trivyzas 2019-03-23 14:11:32 +01:00 committed by Marios Trivyzas
parent 558adc0f28
commit 17b8b54d5e
No known key found for this signature in database
GPG Key ID: 8817B46B0CF36A3F
6 changed files with 100 additions and 21 deletions

View File

@ -276,6 +276,8 @@ aggMinWithAlias
SELECT gender g, MIN(emp_no) m FROM "test_emp" GROUP BY g ORDER BY gender;
aggMinOnDateTime
SELECT gender, MIN(birth_date) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggMinOnDateTimeCastAsDate
SELECT gender, YEAR(CAST(MIN(birth_date) AS DATE)) m FROM "test_emp" GROUP BY gender ORDER BY gender;
// Conditional MIN
aggMinWithHaving
@ -332,6 +334,8 @@ aggMaxWithAlias
SELECT gender g, MAX(emp_no) m FROM "test_emp" GROUP BY g ORDER BY gender;
aggMaxOnDateTime
SELECT gender, MAX(birth_date) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggMaxOnDateTimeCastAsDate
SELECT gender, YEAR(CAST(MAX(birth_date) AS DATE)) m FROM "test_emp" GROUP BY gender ORDER BY gender;
aggAvgAndMaxWithLikeFilter
SELECT CAST(AVG(salary) AS LONG) AS avg, CAST(SUM(salary) AS LONG) AS s FROM "test_emp" WHERE first_name LIKE 'G%';

View File

@ -432,7 +432,7 @@ public class Querier {
if (ref instanceof MetricAggRef) {
MetricAggRef r = (MetricAggRef) ref;
return new MetricAggExtractor(r.name(), r.property(), r.innerKey());
return new MetricAggExtractor(r.name(), r.property(), r.innerKey(), cfg.zoneId(), r.isDateTimeBased());
}
if (ref instanceof TopHitsAggRef) {

View File

@ -18,8 +18,10 @@ import org.elasticsearch.search.aggregations.metrics.PercentileRanks;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.util.DateUtils;
import java.io.IOException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
@ -30,17 +32,23 @@ public class MetricAggExtractor implements BucketExtractor {
private final String name;
private final String property;
private final String innerKey;
private final boolean isDateTimeBased;
private final ZoneId zoneId;
public MetricAggExtractor(String name, String property, String innerKey) {
public MetricAggExtractor(String name, String property, String innerKey, ZoneId zoneId, boolean isDateTimeBased) {
this.name = name;
this.property = property;
this.innerKey = innerKey;
this. isDateTimeBased =isDateTimeBased;
this.zoneId = zoneId;
}
MetricAggExtractor(StreamInput in) throws IOException {
name = in.readString();
property = in.readString();
innerKey = in.readOptionalString();
isDateTimeBased = in.readBoolean();
zoneId = ZoneId.of(in.readString());
}
@Override
@ -48,6 +56,8 @@ public class MetricAggExtractor implements BucketExtractor {
out.writeString(name);
out.writeString(property);
out.writeOptionalString(innerKey);
out.writeBoolean(isDateTimeBased);
out.writeString(zoneId.getId());
}
String name() {
@ -62,6 +72,10 @@ public class MetricAggExtractor implements BucketExtractor {
return innerKey;
}
ZoneId zoneId() {
return zoneId;
}
@Override
public String getWriteableName() {
return NAME;
@ -83,20 +97,33 @@ public class MetricAggExtractor implements BucketExtractor {
//if (innerKey == null) {
// throw new SqlIllegalArgumentException("Invalid innerKey {} specified for aggregation {}", innerKey, name);
//}
return ((InternalNumericMetricsAggregation.MultiValue) agg).value(property);
return handleDateTime(((InternalNumericMetricsAggregation.MultiValue) agg).value(property));
} else if (agg instanceof InternalFilter) {
// COUNT(expr) and COUNT(ALL expr) uses this type of aggregation to account for non-null values only
return ((InternalFilter) agg).getDocCount();
}
Object v = agg.getProperty(property);
return innerKey != null && v instanceof Map ? ((Map<?, ?>) v).get(innerKey) : v;
return handleDateTime(innerKey != null && v instanceof Map ? ((Map<?, ?>) v).get(innerKey) : v);
}
private Object handleDateTime(Object object) {
if (isDateTimeBased) {
if (object == null) {
return object;
} else if (object instanceof Number) {
return DateUtils.asDateTime(((Number) object).longValue(), zoneId);
} else {
throw new SqlIllegalArgumentException("Invalid date key returned: {}", object);
}
}
return object;
}
/**
* Check if the given aggregate has been executed and has computed values
* or not (the bucket is null).
*
*
* Waiting on https://github.com/elastic/elasticsearch/issues/34903
*/
private static boolean containsValues(InternalAggregation agg) {
@ -130,11 +157,11 @@ public class MetricAggExtractor implements BucketExtractor {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
MetricAggExtractor other = (MetricAggExtractor) obj;
return Objects.equals(name, other.name)
&& Objects.equals(property, other.property)
@ -146,4 +173,4 @@ public class MetricAggExtractor implements BucketExtractor {
String i = innerKey != null ? "[" + innerKey + "]" : "";
return Aggs.ROOT_GROUP_NAME + ">" + name + "." + property + i;
}
}
}

View File

@ -404,7 +404,7 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
// COUNT(<field_name>)
} else if (!c.distinct()) {
LeafAgg leafAgg = toAgg(functionId, f);
AggPathInput a = new AggPathInput(f, new MetricAggRef(leafAgg.id(), "doc_count", "_count"));
AggPathInput a = new AggPathInput(f, new MetricAggRef(leafAgg.id(), "doc_count", "_count", false));
queryC = queryC.with(queryC.aggs().addAgg(leafAgg));
return new Tuple<>(queryC, a);
}
@ -430,14 +430,16 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
// FIXME: concern leak - hack around MatrixAgg which is not
// generalized (afaik)
aggInput = new AggPathInput(f,
new MetricAggRef(cAggPath, ia.innerName(), ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null));
new MetricAggRef(cAggPath, ia.innerName(),
ia.innerKey() != null ? QueryTranslator.nameOf(ia.innerKey()) : null,
ia.dataType().isDateBased()));
}
else {
LeafAgg leafAgg = toAgg(functionId, f);
if (f instanceof TopHits) {
aggInput = new AggPathInput(f, new TopHitsAggRef(leafAgg.id(), f.dataType()));
} else {
aggInput = new AggPathInput(f, new MetricAggRef(leafAgg.id()));
aggInput = new AggPathInput(f, new MetricAggRef(leafAgg.id(), f.dataType().isDateBased()));
}
queryC = queryC.with(queryC.aggs().addAgg(leafAgg));
}

View File

@ -17,19 +17,21 @@ public class MetricAggRef extends AggRef {
private final String name;
private final String property;
private final String innerKey;
private final boolean isDateTimeBased;
public MetricAggRef(String name) {
this(name, "value");
public MetricAggRef(String name, boolean isDateTimeBased) {
this(name, "value", isDateTimeBased);
}
public MetricAggRef(String name, String property) {
this(name, property, null);
public MetricAggRef(String name, String property, boolean isDateTimeBased) {
this(name, property, null, isDateTimeBased);
}
public MetricAggRef(String name, String property, String innerKey) {
public MetricAggRef(String name, String property, String innerKey, boolean isDateTimeBased) {
this.name = name;
this.property = property;
this.innerKey = innerKey;
this.isDateTimeBased = isDateTimeBased;
}
public String name() {
@ -44,6 +46,10 @@ public class MetricAggRef extends AggRef {
return innerKey;
}
public boolean isDateTimeBased() {
return isDateTimeBased;
}
@Override
public String toString() {
String i = innerKey != null ? "[" + innerKey + "]" : "";

View File

@ -10,9 +10,12 @@ import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.SqlException;
import org.elasticsearch.xpack.sql.util.DateUtils;
import java.io.IOException;
import java.time.ZoneId;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -22,7 +25,8 @@ import static java.util.Collections.singletonMap;
public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<MetricAggExtractor> {
public static MetricAggExtractor randomMetricAggExtractor() {
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16));
return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16),
randomZone(), randomBoolean());
}
@Override
@ -37,7 +41,12 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
@Override
protected MetricAggExtractor mutateInstance(MetricAggExtractor instance) throws IOException {
return new MetricAggExtractor(instance.name() + "mutated", instance.property(), instance.innerKey());
return new MetricAggExtractor(
instance.name() + "mutated",
instance.property() + "mutated",
instance.innerKey() + "mutated",
randomValueOtherThan(instance.zoneId(),
ESTestCase::randomZone), randomBoolean());
}
public void testNoAggs() {
@ -48,7 +57,7 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
}
public void testSingleValueProperty() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);
double value = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()), value);
@ -56,8 +65,18 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
assertEquals(value, extractor.extract(bucket));
}
public void testSingleValuePropertyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("my_date_field", "property", "innerKey", zoneId, true);
double value = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()), value);
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
}
public void testSingleValueInnerKey() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);
double innerValue = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()),
singletonMap(extractor.innerKey(), innerValue));
@ -65,12 +84,33 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
assertEquals(innerValue, extractor.extract(bucket));
}
public void testSingleValueInnerKeyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", zoneId, true);
double innerValue = randomDouble();
Aggregation agg = new TestSingleValueAggregation(extractor.name(), singletonList(extractor.property()),
singletonMap(extractor.innerKey(), innerValue));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) innerValue , zoneId), extractor.extract(bucket));
}
public void testMultiValueProperty() {
MetricAggExtractor extractor = randomMetricAggExtractor();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", null, false);
double value = randomDouble();
Aggregation agg = new TestMultiValueAggregation(extractor.name(), singletonMap(extractor.property(), value));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(value, extractor.extract(bucket));
}
public void testMultiValuePropertyDate() {
ZoneId zoneId = randomZone();
MetricAggExtractor extractor = new MetricAggExtractor("field", "property", "innerKey", zoneId, true);
double value = randomDouble();
Aggregation agg = new TestMultiValueAggregation(extractor.name(), singletonMap(extractor.property(), value));
Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
}
}