Add ARRAY_QUANTILE function. (#13061)

* Add ARRAY_QUANTILE function.

Expected usage is like: ARRAY_QUANTILE(ARRAY_AGG(x), 0.9).

* Fix test.
This commit is contained in:
Gian Merlino 2022-09-09 11:29:20 -07:00 committed by GitHub
parent 5cc5f7b60c
commit e29e7a8434
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 494 additions and 0 deletions

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.expression;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.doubles.DoubleList;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
/**
* Computes a particular quantile from a numeric array.
*
* Usage: {@code array_quantile(array, rank)}. The requested quantile is given by "rank", which must be from 0 to 1,
* inclusive: 0 is the minimum, 1 is the maximum. Null values in the input array are ignored.
*
* Returns {@link Double#NaN} if the requested quantile is below 0 or above 1. Returns {@link Double#NaN} if the
* input array is numeric, yet contains no nonnull elements. Returns null if the input is not a numeric array at all.
*
* If the requested quantile falls between two elements of the input array, the result is a linear interpolation of
* the two closest values. According to Wikipedia (https://en.wikipedia.org/wiki/Quantile), the interpolation algorithm
* we're using is the default method in R, NumPy, and Julia, and matches Excel's PERCENTILE.INC function.
*/
public class ArrayQuantileExprMacro implements ExprMacroTable.ExprMacro
{
public static final String FN_NAME = "array_quantile";
private static final String RANK_ARG_NAME = "rank";
@Override
public String name()
{
return FN_NAME;
}
@Override
public Expr apply(final List<Expr> args)
{
validationHelperCheckArgumentCount(args, 2);
final Expr arg = args.get(0);
final Expr rankArg = args.get(1);
validationHelperCheckArgIsLiteral(rankArg, RANK_ARG_NAME);
if (!(rankArg.getLiteralValue() instanceof Number)) {
throw validationFailed("%s must be a number", RANK_ARG_NAME);
}
final double rank = ((Number) rankArg.getLiteralValue()).doubleValue();
class ArrayQuantileExpr extends ExprMacroTable.BaseScalarUnivariateMacroFunctionExpr
{
private ArrayQuantileExpr(Expr arg)
{
super(FN_NAME, arg);
}
@Nonnull
@Override
public ExprEval<?> eval(final ObjectBinding bindings)
{
final DoubleList doubles = toDoubleArray(arg.eval(bindings));
if (doubles == null) {
return ExprEval.ofDouble(null);
}
// Could speed up by using selection (like quickselect) instead of sort: expected O(n) instead of O(n logn).
doubles.sort(null);
return ExprEval.ofDouble(quantileFromSortedArray(doubles, rank));
}
@Override
public Expr visit(Shuttle shuttle)
{
return shuttle.visit(apply(shuttle.visitAll(args)));
}
@Nullable
@Override
public ExpressionType getOutputType(InputBindingInspector inspector)
{
return ExpressionType.DOUBLE;
}
@Override
public String stringify()
{
return StringUtils.format("%s(%s, %s)", FN_NAME, arg.stringify(), rankArg.stringify());
}
}
return new ArrayQuantileExpr(arg);
}
/**
* Returns a double[] copy of an {@link ExprEval}, or null if the eval is null, or is an array that contains any
* nonnumeric elements. Nulls are skipped.
*/
@Nullable
static DoubleList toDoubleArray(final ExprEval<?> eval)
{
if (!eval.type().isArray() || !eval.type().getElementType().isNumeric()) {
return null;
}
final Object[] arr = eval.asArray();
if (arr == null) {
return null;
}
// Copy array to double[], while verifying all elements are numbers and skipping nulls.
final DoubleArrayList doubles = new DoubleArrayList(arr.length);
for (final Object o : arr) {
if (o != null) {
doubles.add(((Number) o).doubleValue());
}
}
return doubles;
}
static double quantileFromSortedArray(final DoubleList sortedDoubles, final double rank)
{
if (sortedDoubles.size() == 0 || rank < 0 || rank > 1) {
return Double.NaN;
}
final double index = rank * (sortedDoubles.size() - 1);
if (index <= 0) {
// Minimum
return sortedDoubles.getDouble(0);
} else if (index >= sortedDoubles.size() - 1) {
// Maximum
return sortedDoubles.getDouble(sortedDoubles.size() - 1);
} else if (index == (int) index) {
// Specific element
return sortedDoubles.getDouble((int) index);
} else {
// Linearly interpolate between two closest elements
final double step = index - (int) index;
final double a = sortedDoubles.getDouble((int) index);
final double b = sortedDoubles.getDouble((int) index + 1);
return a + step * (b - a);
}
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.expression;
import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.doubles.DoubleList;
import it.unimi.dsi.fastutil.doubles.DoubleLists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.InputBindings;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
public class ArrayQuantileExprMacroTest extends InitializedNullHandlingTest
{
@Test
public void test_apply_longArray()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofLongArray(new Object[]{1L, 3L, 2L}).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
Assert.assertEquals(
2.0,
result.eval(InputBindings.nilBindings()).asDouble(),
0.0
);
}
@Test
public void test_apply_longArrayWithNulls()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofLongArray(new Object[]{1L, 3L, null, null, null, 2L}).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
Assert.assertEquals(
2.0,
result.eval(InputBindings.nilBindings()).asDouble(),
0.0
);
}
@Test
public void test_apply_doubleArray()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofDoubleArray(new Object[]{1.0, 3.0, 2.0}).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
Assert.assertEquals(
2.0,
result.eval(InputBindings.nilBindings()).asDouble(),
0.0
);
}
@Test
public void test_apply_doubleArrayWithNulls()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofDoubleArray(new Object[]{1.0, null, null, null, 3.0, 2.0}).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
Assert.assertEquals(
2.0,
result.eval(InputBindings.nilBindings()).asDouble(),
0.0
);
}
@Test
public void test_apply_stringArray()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofStringArray(new Object[]{"1.0", "3.0", "2.0"}).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
if (NullHandling.sqlCompatible()) {
Assert.assertTrue(result.eval(InputBindings.nilBindings()).isNumericNull());
} else {
Assert.assertFalse(result.eval(InputBindings.nilBindings()).isNumericNull());
Assert.assertEquals(0, result.eval(InputBindings.nilBindings()).asDouble(), 0);
}
}
@Test
public void test_apply_null()
{
final Expr result = new ArrayQuantileExprMacro().apply(
ImmutableList.of(
ExprEval.ofLongArray(null).toExpr(),
ExprEval.ofDouble(0.5).toExpr()
)
);
if (NullHandling.sqlCompatible()) {
Assert.assertTrue(result.eval(InputBindings.nilBindings()).isNumericNull());
} else {
Assert.assertFalse(result.eval(InputBindings.nilBindings()).isNumericNull());
Assert.assertEquals(0, result.eval(InputBindings.nilBindings()).asDouble(), 0);
}
}
@Test
public void test_quantileFromSortedArray()
{
final DoubleList doubles = DoubleList.of(
1.74894566717352,
2.45877596678213,
6.84501873459025,
18.05541572400960,
18.46552908786640,
21.67577450542990,
28.27502148905920,
29.38150656294550,
31.51777607091530,
35.07176789407870,
35.44337813640110,
36.00285458859680,
38.02930138807480,
38.91193281665990,
39.10448180900530,
41.73995751226990,
44.09796685057930,
44.97457148479690,
69.52896057856050,
74.77683331911330,
77.96955453249100,
80.79983221039570,
83.32696453924490,
87.71915087266120,
90.18343512171780,
96.84202159588680
);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, -0.1), 0.0000001);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0), 0.0000001);
Assert.assertEquals(1.748963413, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.000001), 0.0000001);
Assert.assertEquals(12.45021723, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.1), 0.0000001);
Assert.assertEquals(21.67577451, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.2), 0.0000001);
Assert.assertEquals(30.44964132, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.3), 0.0000001);
Assert.assertEquals(35.44337814, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.4), 0.0000001);
Assert.assertEquals(38.4706171, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.5), 0.0000001);
Assert.assertEquals(41.73995751, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.6), 0.0000001);
Assert.assertEquals(57.25176603, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.7), 0.0000001);
Assert.assertEquals(77.96955453, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.8), 0.0000001);
Assert.assertEquals(85.52305771, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.9), 0.0000001);
Assert.assertEquals(96.84185513, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.999999), 0.0000001);
Assert.assertEquals(96.8420216, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1), 0.0000001);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1.1), 0.0000001);
}
@Test
public void test_quantileFromSortedArray_singleElement()
{
final DoubleList doubles = DoubleList.of(1.748945667);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, -0.1), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.000001), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.1), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.2), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.3), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.4), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.5), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.6), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.7), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.8), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.9), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.999999), 0);
Assert.assertEquals(1.748945667, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1.1), 0);
}
@Test
public void test_quantileFromSortedArray_noElements()
{
final DoubleList doubles = DoubleLists.emptyList();
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, -0.1), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.000001), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.1), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.2), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.3), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.4), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.5), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.6), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.7), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.8), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.9), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 0.999999), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1), 0);
Assert.assertEquals(Double.NaN, ArrayQuantileExprMacro.quantileFromSortedArray(doubles, 1.1), 0);
}
}

View File

@ -37,6 +37,7 @@ public class TestExprMacroTable extends ExprMacroTable
{
super(
ImmutableList.of(
new ArrayQuantileExprMacro(),
new IPv4AddressMatchExprMacro(),
new IPv4AddressParseExprMacro(),
new IPv4AddressStringifyExprMacro(),

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.ArrayQuantileExprMacro;
import org.apache.druid.query.expression.CaseInsensitiveContainsExprMacro;
import org.apache.druid.query.expression.ContainsExprMacro;
import org.apache.druid.query.expression.GuiceExprMacroTable;
@ -49,6 +50,7 @@ public class ExpressionModule implements Module
{
public static final List<Class<? extends ExprMacroTable.ExprMacro>> EXPR_MACROS =
ImmutableList.<Class<? extends ExprMacroTable.ExprMacro>>builder()
.add(ArrayQuantileExprMacro.class)
.add(IPv4AddressMatchExprMacro.class)
.add(IPv4AddressParseExprMacro.class)
.add(IPv4AddressStringifyExprMacro.class)

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.expression.builtin;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
public class ArrayQuantileOperatorConversion extends DirectOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("ARRAY_QUANTILE")
.operandTypes(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC)
.literalOperands(1)
.functionCategory(SqlFunctionCategory.NUMERIC)
.returnTypeNullable(SqlTypeName.DOUBLE)
.build();
public ArrayQuantileOperatorConversion()
{
super(SQL_FUNCTION, "array_quantile");
}
}

View File

@ -65,6 +65,7 @@ import org.apache.druid.sql.calcite.expression.builtin.ArrayOrdinalOfOperatorCon
import org.apache.druid.sql.calcite.expression.builtin.ArrayOrdinalOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ArrayOverlapOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ArrayPrependOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ArrayQuantileOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ArraySliceOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ArrayToStringOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.BTrimOperatorConversion;
@ -225,6 +226,7 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new ArrayOrdinalOperatorConversion())
.add(new ArrayOffsetOfOperatorConversion())
.add(new ArrayOrdinalOfOperatorConversion())
.add(new ArrayQuantileOperatorConversion())
.add(new ArraySliceOperatorConversion())
.add(new ArrayToStringOperatorConversion())
.add(new StringToArrayOperatorConversion())

View File

@ -1660,6 +1660,49 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testArrayAggQuantile()
{
cannotVectorize();
testQuery(
"SELECT ARRAY_QUANTILE(ARRAY_AGG(l1), 0.9) FROM numfoo",
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE3)
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
new ExpressionLambdaAggregatorFactory(
"a0",
ImmutableSet.of("l1"),
"__acc",
"ARRAY<LONG>[]",
"ARRAY<LONG>[]",
true,
true,
false,
"array_append(\"__acc\", \"l1\")",
"array_concat(\"__acc\", \"a0\")",
null,
null,
ExpressionLambdaAggregatorFactory.DEFAULT_MAX_SIZE_BYTES,
TestExprMacroTable.INSTANCE
)
)
)
.postAggregators(
expressionPostAgg("p0", "array_quantile(\"a0\",0.9)")
)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
// Different results because there are some nulls in the column. In SQL-compatible mode we ignore them;
// in replace-with-default mode we treat them as zeroes.
ImmutableList.of(new Object[]{NullHandling.sqlCompatible() ? 260259.80000000002 : 162665.0})
);
}
@Test
public void testArrayAggArrays()
{