SQL: Re-enable matrix aggregations (elastic/x-pack-elasticsearch#3977)

Remove functions without a backing matrix agg

MatrixAgg works across multiple fields and exposing it directly in SQL
does not work. Instead isolated functions are exposed which get folded
and optimized into one matrix agg per field. Thus not all matrix 
functions can be exposed in SQL, at least at this time.

Instead of depending on the plugin directly, depend on the plugin client
jar (matrix-agg-client)

Remove outdated test

Original commit: elastic/x-pack-elasticsearch@ec9b31bf59
This commit is contained in:
Costin Leau 2018-03-04 01:53:56 +02:00 committed by GitHub
parent 8fd361ba83
commit 58f43ad4f0
17 changed files with 64 additions and 271 deletions

View File

@ -21,7 +21,8 @@ integTest.enabled = false
dependencies {
compileOnly "org.elasticsearch.plugin:x-pack-core:${version}"
compile project('sql-proto')
compile 'org.antlr:antlr4-runtime:4.5.3'
compile "org.elasticsearch.plugin:aggs-matrix-stats-client:${version}"
compile "org.antlr:antlr4-runtime:4.5.3"
testCompile "org.elasticsearch.test:framework:${version}"
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('security'), configuration: 'testArtifacts')

View File

@ -50,6 +50,7 @@ import org.elasticsearch.xpack.sql.util.StringUtils;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
// TODO: add retry/back-off
@ -164,8 +165,9 @@ public class Scroller {
}
else if (col instanceof AggRef) {
Object[] arr;
AggRef ref = (AggRef) col;
String path = ((AggRef) col).path();
String path = ref.path();
// yup, this is instance equality to make sure we only check the path used by the code
if (path == TotalCountRef.PATH) {
arr = new Object[] { Long.valueOf(response.getHits().getTotalHits()) };
@ -178,12 +180,24 @@ public class Scroller {
}
Object value = getAggProperty(response.getAggregations(), path);
// // FIXME: this can be tabular in nature
// if (ref instanceof MappedAggRef) {
// Map<String, Object> map = (Map<String, Object>) value;
// Object extractedValue = map.get(((MappedAggRef)
// ref).fieldName());
// }
// unwrap nested map
if (ref.innerKey() != null) {
// needs changing when moving to Composite
if (value instanceof Object[]) {
Object[] val = (Object[]) value;
arr = new Object[val.length];
for (int i = 0; i < arr.length; i++) {
if (val[i] instanceof Map) {
arr[i] = ((Map<?, ?>) val[i]).get(ref.innerKey());
}
}
value = arr;
} else {
if (value instanceof Map) {
value = new Object[] { ((Map<?, ?>) value).get(ref.innerKey()) };
}
}
}
if (formattedKey) {
List<? extends Bucket> buckets = ((MultiBucketsAggregation) value).getBuckets();

View File

@ -9,15 +9,9 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Avg;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Correlation;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Covariance;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Kurtosis;
import org.elasticsearch.xpack.sql.expression.function.aggregate.MatrixCount;
import org.elasticsearch.xpack.sql.expression.function.aggregate.MatrixMean;
import org.elasticsearch.xpack.sql.expression.function.aggregate.MatrixVariance;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Max;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Mean;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.sql.expression.function.aggregate.Percentile;
import org.elasticsearch.xpack.sql.expression.function.aggregate.PercentileRank;
@ -85,20 +79,13 @@ public class FunctionRegistry {
def(Min.class, Min::new),
def(Sum.class, Sum::new),
// Statistics
def(Mean.class, Mean::new), // TODO can we just use Avg?
def(StddevPop.class, StddevPop::new),
def(VarPop.class, VarPop::new),
def(Percentile.class, Percentile::new),
def(PercentileRank.class, PercentileRank::new),
def(SumOfSquares.class, SumOfSquares::new),
// Matrix aggs
def(MatrixCount.class, MatrixCount::new),
def(MatrixMean.class, MatrixMean::new),
def(MatrixVariance.class, MatrixVariance::new),
def(Skewness.class, Skewness::new),
def(Kurtosis.class, Kurtosis::new),
def(Covariance.class, Covariance::new),
def(Correlation.class, Correlation::new),
// Scalar functions
// Date
def(DayOfMonth.class, DayOfMonth::new, "DAY", "DOM"),

View File

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
public class Correlation extends NumericAggregate implements MatrixStatsEnclosed {
public Correlation(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<Correlation> info() {
return NodeInfo.create(this, Correlation::new, field());
}
@Override
public Correlation replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 1) {
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
}
return new Correlation(location(), newChildren.get(0));
}
@Override
public String innerName() {
return "correlation";
}
}

View File

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
public class Covariance extends NumericAggregate implements MatrixStatsEnclosed {
public Covariance(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<Covariance> info() {
return NodeInfo.create(this, Covariance::new, field());
}
@Override
public Covariance replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 1) {
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
}
return new Covariance(location(), newChildren.get(0));
}
@Override
public String innerName() {
return "covariance";
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.expression.function.Function;
import org.elasticsearch.xpack.sql.querydsl.agg.AggPath;
@ -13,6 +12,8 @@ import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.type.DataType;
import java.util.List;
public class InnerAggregate extends AggregateFunction {
private final AggregateFunction inner;
@ -91,6 +92,6 @@ public class InnerAggregate extends AggregateFunction {
@Override
public String name() {
return "(" + inner.functionName() + "#" + inner.id() + "/" + outer.toString() + ")";
return inner.name();
}
}
}

View File

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
public class MatrixCount extends NumericAggregate implements MatrixStatsEnclosed {
public MatrixCount(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<MatrixCount> info() {
return NodeInfo.create(this, MatrixCount::new, field());
}
@Override
public MatrixCount replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 1) {
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
}
return new MatrixCount(location(), newChildren.get(0));
}
@Override
public String innerName() {
return "matrix_count";
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
public class MatrixMean extends NumericAggregate implements MatrixStatsEnclosed {
public MatrixMean(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<MatrixMean> info() {
return NodeInfo.create(this, MatrixMean::new, field());
}
@Override
public MatrixMean replaceChildren(List<Expression> newChildren) {
return new MatrixMean(location(), newChildren.get(0));
}
@Override
public String innerName() {
return "matrix_mean";
}
}

View File

@ -1,36 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
public class MatrixVariance extends NumericAggregate implements MatrixStatsEnclosed {
public MatrixVariance(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<MatrixVariance> info() {
return NodeInfo.create(this, MatrixVariance::new, field());
}
@Override
public MatrixVariance replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 1) {
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
}
return new MatrixVariance(location(), newChildren.get(0));
}
@Override
public String innerName() {
return "matrix_variance";
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.expression.function.aggregate;
import java.util.List;
import org.elasticsearch.xpack.sql.expression.Expression;
import org.elasticsearch.xpack.sql.tree.Location;
import org.elasticsearch.xpack.sql.tree.NodeInfo;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DataTypes;
public class Mean extends NumericAggregate implements MatrixStatsEnclosed {
public Mean(Location location, Expression field) {
super(location, field);
}
@Override
protected NodeInfo<Mean> info() {
return NodeInfo.create(this, Mean::new, field());
}
@Override
public Mean replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 1) {
throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]");
}
return new Mean(location(), newChildren.get(0));
}
@Override
public DataType dataType() {
return DataType.DOUBLE;
}
@Override
public String innerName() {
return "means";
}
}

View File

@ -369,11 +369,7 @@ class QueryFolder extends RuleExecutor<PhysicalPlan> {
"Expected aggregate function inside alias; got %s", child.nodeString());
Tuple<QueryContainer, AggPathInput> withAgg = addAggFunction(matchingGroup,
(AggregateFunction) child, compoundAggMap, queryC);
//FIXME: what about inner key
queryC = withAgg.v1().addAggColumn(withAgg.v2().context());
if (withAgg.v2().innerKey() != null) {
throw new PlanningException("innerkey/matrix stats not handled (yet)");
}
queryC = withAgg.v1().addAggColumn(withAgg.v2().context(), withAgg.v2().innerKey());
}
}
// not an Alias or Function means it's an Attribute so apply the same logic as above

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.sql.querydsl.agg;
import java.util.List;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.xpack.sql.planner.PlanningException;
import java.util.List;
import static org.elasticsearch.search.aggregations.MatrixStatsAggregationBuilders.matrixStats;
public class MatrixStatsAgg extends LeafAgg {
@ -17,11 +18,10 @@ public class MatrixStatsAgg extends LeafAgg {
public MatrixStatsAgg(String id, String propertyPath, List<String> fields) {
super(id, propertyPath, "<multi-field>");
this.fields = fields;
throw new PlanningException("innerkey/matrix stats not handled (yet)", RestStatus.BAD_REQUEST);
}
@Override
AggregationBuilder toBuilder() {
throw new UnsupportedOperationException();
return matrixStats(id()).fields(fields);
}
}

View File

@ -12,12 +12,19 @@ import org.elasticsearch.xpack.sql.querydsl.agg.AggPath;
public class AggRef implements FieldExtraction {
private final String path;
private final int depth;
private final String innerKey;
public AggRef(String path) {
this(path, null);
}
public AggRef(String path, String innerKey) {
this.path = path;
depth = AggPath.depth(path);
this.innerKey = innerKey;
}
@Override
public String toString() {
return path;
@ -32,6 +39,10 @@ public class AggRef implements FieldExtraction {
return path;
}
public String innerKey() {
return innerKey;
}
@Override
public void collectFields(SqlSourceBuilder sourceBuilder) {
// Aggregations do not need any special fields

View File

@ -302,6 +302,10 @@ public class QueryContainer {
return with(combine(columns, new AggRef(aggPath)));
}
public QueryContainer addAggColumn(String aggPath, String innerKey) {
return with(combine(columns, new AggRef(aggPath, innerKey)));
}
public QueryContainer addAggCount(GroupingAgg parentGroup, String functionId) {
FieldExtraction ref = parentGroup == null ? TotalCountRef.INSTANCE : new AggRef(AggPath.bucketCount(parentGroup.asParentPath()));
Map<String, GroupingAgg> pseudoFunctions = new LinkedHashMap<>(this.pseudoFunctions);

View File

@ -473,17 +473,6 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
assertEquals(0, getNumberOfSearchContexts("test"));
}
public void testSelectUnimplementedMatrixAggs() throws IOException {
StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_id\":\"1\"}}\n");
bulk.append("{\"foo\":1}\n");
client().performRequest("POST", "/test/doc/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
expectBadRequest(() -> runSql(randomMode(),
"SELECT covariance(foo) FROM test"), containsString("innerkey/matrix stats not handled (yet)"));
}
private Tuple<String, String> runSqlAsText(String sql) throws IOException {
return runSqlAsText("", new StringEntity("{\"query\":\"" + sql + "\"}", ContentType.APPLICATION_JSON));
}

View File

@ -75,3 +75,19 @@ SELECT SUM(salary) FROM test_emp;
---------------
4824855
;
kurtosisAndSkewnessNoGroup
SELECT KURTOSIS(emp_no) k, SKEWNESS(salary) s FROM test_emp;
k:d | s:d
1.7997599759975997 | 0.2707722118423227
;
kurtosisAndSkewnessGroup
SELECT gender, KURTOSIS(salary) k, SKEWNESS(salary) s FROM test_emp GROUP BY gender;
gender:s | k:d | s:d
M | 2.259327644285826 | 0.40268950715550333
F | 1.8427808415250482 | 0.04517149340491813
;

View File

@ -12,19 +12,13 @@ COUNT |AGGREGATE
MAX |AGGREGATE
MIN |AGGREGATE
SUM |AGGREGATE
MEAN |AGGREGATE
STDDEV_POP |AGGREGATE
VAR_POP |AGGREGATE
PERCENTILE |AGGREGATE
PERCENTILE_RANK |AGGREGATE
SUM_OF_SQUARES |AGGREGATE
MATRIX_COUNT |AGGREGATE
MATRIX_MEAN |AGGREGATE
MATRIX_VARIANCE |AGGREGATE
SKEWNESS |AGGREGATE
KURTOSIS |AGGREGATE
COVARIANCE |AGGREGATE
CORRELATION |AGGREGATE
DAY_OF_MONTH |SCALAR
DAY |SCALAR
DOM |SCALAR