support for array expressions in TransformSpec with ExpressionTransform (#8744)

* transformSpec + array expressions

changes:
* added array expression support to transformSpec
* removed ParseSpec.verify since its only use afaict was preventing transform expr that did not replace their input from functioning
* hijacked index task test to test changes

* remove docs about being unsupported

* re-arrange test assert

* unused imports

* imports

* fix tests

* preserve types

* suppress warning, fixes, add test

* formatting

* cleanup

* better list to array type conversion and tests

* fix oops
This commit is contained in:
Clint Wylie 2019-11-13 11:04:37 -08:00 committed by Gian Merlino
parent 9ed9a80b9d
commit cc54b2a9df
16 changed files with 268 additions and 130 deletions

View File

@ -57,7 +57,6 @@ public class CSVParseSpec extends ParseSpec
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
@ -102,14 +101,6 @@ public class CSVParseSpec extends ParseSpec
return skipHeaderRows;
}
@Override
public void verify(List<String> usedCols)
{
for (String columnName : usedCols) {
Preconditions.checkArgument(columns.contains(columnName), "column[%s] not in columns.", columnName);
}
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -60,7 +60,6 @@ public class DelimitedParseSpec extends ParseSpec
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
@ -112,14 +111,6 @@ public class DelimitedParseSpec extends ParseSpec
return skipHeaderRows;
}
@Override
public void verify(List<String> usedCols)
{
for (String columnName : usedCols) {
Preconditions.checkArgument(columns.contains(columnName), "column[%s] not in columns.", columnName);
}
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -25,8 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.parsers.JSONToLowerParser;
import org.apache.druid.java.util.common.parsers.Parser;
import java.util.List;
/**
* This class is only here for backwards compatibility
*/
@ -45,11 +43,6 @@ public class JSONLowercaseParseSpec extends ParseSpec
this.objectMapper = new ObjectMapper();
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.Parser;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -62,11 +61,6 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
this(ts, dims, null, null);
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -27,8 +27,6 @@ import org.apache.druid.java.util.common.parsers.JavaScriptParser;
import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.js.JavaScriptConfig;
import java.util.List;
/**
*/
public class JavaScriptParseSpec extends ParseSpec
@ -59,11 +57,6 @@ public class JavaScriptParseSpec extends ParseSpec
return function;
}
@Override
public void verify(List<String> usedCols)
{
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -27,8 +27,6 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.parsers.Parser;
import java.util.List;
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format")
@JsonSubTypes(value = {
@ -64,12 +62,6 @@ public abstract class ParseSpec
return dimensionsSpec;
}
@PublicApi
public void verify(List<String> usedCols)
{
// do nothing
}
public Parser<String, Object> makeParser()
{
return null;

View File

@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.Parser;
import org.apache.druid.java.util.common.parsers.RegexParser;
@ -50,8 +49,6 @@ public class RegexParseSpec extends ParseSpec
this.listDelimiter = listDelimiter;
this.columns = columns;
this.pattern = pattern;
verify(dimensionsSpec.getDimensionNames());
}
@JsonProperty
@ -72,16 +69,6 @@ public class RegexParseSpec extends ParseSpec
return columns;
}
@Override
public void verify(List<String> usedCols)
{
if (columns != null) {
for (String columnName : usedCols) {
Preconditions.checkArgument(columns.contains(columnName), "column[%s] not in columns.", columnName);
}
}
}
@Override
public Parser<String, Object> makeParser()
{

View File

@ -112,7 +112,7 @@ public class StringUtils
/**
* Encodes "string" into the buffer "byteBuffer", using no more than the number of bytes remaining in the buffer.
* Will only encode whole characters. The byteBuffer's position and limit be changed during operation, but will
* Will only encode whole characters. The byteBuffer's position and limit may be changed during operation, but will
* be reset before this method call ends.
*
* @return the number of bytes written, which may be shorter than the full encoded string length if there

View File

@ -27,28 +27,6 @@ import java.util.Collections;
public class CSVParseSpecTest
{
@Test(expected = IllegalArgumentException.class)
public void testColumnMissing()
{
@SuppressWarnings("unused") // expected exception
final ParseSpec spec = new CSVParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("a", "b")),
new ArrayList<>(),
new ArrayList<>()
),
",",
Collections.singletonList("a"),
false,
0
);
}
@Test(expected = IllegalArgumentException.class)
public void testComma()
{
@ -65,7 +43,7 @@ public class CSVParseSpecTest
new ArrayList<>()
),
",",
Collections.singletonList("a"),
Collections.singletonList("a,"),
false,
0
);

View File

@ -58,29 +58,6 @@ public class DelimitedParseSpecTest
Assert.assertEquals(Collections.singletonList("abc"), serde.getDimensionsSpec().getDimensionNames());
}
@Test(expected = IllegalArgumentException.class)
public void testColumnMissing()
{
@SuppressWarnings("unused") // expected exception
final ParseSpec spec = new DelimitedParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("a", "b")),
new ArrayList<>(),
new ArrayList<>()
),
",",
" ",
Collections.singletonList("a"),
false,
0
);
}
@Test(expected = IllegalArgumentException.class)
public void testComma()
{
@ -98,7 +75,7 @@ public class DelimitedParseSpecTest
),
",",
null,
Collections.singletonList("a"),
Collections.singletonList("a,"),
false,
0
);

View File

@ -60,9 +60,6 @@ dialects. However, by using the `array_to_string` function, aggregations may be
complete array, allowing the complete row to be preserved. Using `string_to_array` in an expression post-aggregator,
allows transforming the stringified dimension back into the true native array type.
> Note that array functions are not currently supported at ingestion time with
> [`transformSpec`](../ingestion/index.md#transformspec).
The following built-in functions are available.

View File

@ -101,6 +101,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -237,9 +238,9 @@ public class IndexTaskTest extends IngestionTestBase
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
writer.write("2014-01-01T00:00:10Z,a,an|array,1|2|3,1\n");
writer.write("2014-01-01T01:00:20Z,b,another|array,3|4,1\n");
writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n");
}
IndexTask indexTask = new IndexTask(
@ -248,11 +249,40 @@ public class IndexTaskTest extends IngestionTestBase
createIngestionSpec(
jsonMapper,
tmpDir,
null,
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(
Arrays.asList(
"ts",
"dim",
"dim_array",
"dim_num_array",
"dimt",
"dimtarray1",
"dimtarray2",
"dimtnum_array"
)
),
new ArrayList<>(),
new ArrayList<>()
),
"|",
Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"),
false,
0
),
new TransformSpec(
new SelectorDimFilter("dim", "b", null),
ImmutableList.of(
new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil())
new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()),
new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()),
new ExpressionTransform("dimtarray2", "map(d -> concat(d, 'foo'), dim_array)", ExprMacroTable.nil()),
new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil())
)
),
null,
@ -271,6 +301,47 @@ public class IndexTaskTest extends IngestionTestBase
final List<DataSegment> segments = runTask(indexTask).rhs;
Assert.assertEquals(1, segments.size());
DataSegment segment = segments.get(0);
final File segmentFile = segmentLoader.getSegmentFiles(segment);
final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
segment.getInterval()
);
final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
null,
segment.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
final List<Map<String, Object>> transforms = cursorSequence
.map(cursor -> {
final DimensionSelector selector1 = cursor.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt"));
final DimensionSelector selector2 = cursor.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("dimtarray1", "dimtarray1"));
final DimensionSelector selector3 = cursor.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("dimtarray2", "dimtarray2"));
final DimensionSelector selector4 = cursor.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("dimtnum_array", "dimtnum_array"));
Map<String, Object> row = new HashMap<>();
row.put("dimt", selector1.defaultGetObject());
row.put("dimtarray1", selector2.defaultGetObject());
row.put("dimtarray2", selector3.defaultGetObject());
row.put("dimtnum_array", selector4.defaultGetObject());
cursor.advance();
return row;
})
.toList();
Assert.assertEquals(1, transforms.size());
Assert.assertEquals("bb", transforms.get(0).get("dimt"));
Assert.assertEquals(ImmutableList.of("b", "b"), transforms.get(0).get("dimtarray1"));
Assert.assertEquals(ImmutableList.of("anotherfoo", "arrayfoo"), transforms.get(0).get("dimtarray2"));
Assert.assertEquals(ImmutableList.of("6.0", "7.0"), transforms.get(0).get("dimtnum_array"));
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());

View File

@ -28,7 +28,9 @@ import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.virtual.ExpressionSelectors;
import java.util.List;
import java.util.Objects;
public class ExpressionTransform implements Transform
@ -81,7 +83,7 @@ public class ExpressionTransform implements Transform
@Override
public Object eval(final Row row)
{
return expr.eval(name -> getValueFromRow(row, name)).value();
return ExpressionSelectors.coerceEvalToSelectorObject(expr.eval(name -> getValueFromRow(row, name)));
}
}
@ -90,7 +92,11 @@ public class ExpressionTransform implements Transform
if (column.equals(ColumnHolder.TIME_COLUMN_NAME)) {
return row.getTimestampFromEpoch();
} else {
return row.getRaw(column);
Object raw = row.getRaw(column);
if (raw instanceof List) {
return ExpressionSelectors.coerceListToArray((List) raw);
}
return raw;
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.Parser;
@ -108,12 +109,7 @@ public class ExpressionSelectors
{
// No need for null check on getObject() since baseSelector impls will never return null.
ExprEval eval = baseSelector.getObject();
if (eval.isArray()) {
return Arrays.stream(eval.asStringArray())
.map(NullHandling::emptyToNullIfNeeded)
.collect(Collectors.toList());
}
return eval.value();
return coerceEvalToSelectorObject(eval);
}
@Override
@ -492,7 +488,7 @@ public class ExpressionSelectors
if (val instanceof Number || val instanceof String) {
return val;
} else if (val instanceof List) {
return coerceListDimToStringArray((List) val);
return coerceListToArray((List) val);
} else {
return null;
}
@ -501,7 +497,7 @@ public class ExpressionSelectors
return () -> {
final Object val = selector.getObject();
if (val != null) {
return coerceListDimToStringArray((List) val);
return coerceListToArray((List) val);
}
return null;
};
@ -514,15 +510,86 @@ public class ExpressionSelectors
/**
* Selectors are not consistent in treatment of null, [], and [null], so coerce [] to [null]
*/
private static Object coerceListDimToStringArray(List val)
public static Object coerceListToArray(@Nullable List<?> val)
{
Object[] arrayVal = val.stream().map(x -> x != null ? x.toString() : x).toArray(String[]::new);
if (arrayVal.length > 0) {
return arrayVal;
if (val != null && val.size() > 0) {
Class coercedType = null;
for (Object elem : val) {
if (elem != null) {
coercedType = convertType(coercedType, elem.getClass());
}
}
if (coercedType == Long.class || coercedType == Integer.class) {
return val.stream().map(x -> x != null ? ((Number) x).longValue() : null).toArray(Long[]::new);
}
if (coercedType == Float.class || coercedType == Double.class) {
return val.stream().map(x -> x != null ? ((Number) x).doubleValue() : null).toArray(Double[]::new);
}
// default to string
return val.stream().map(x -> x != null ? x.toString() : null).toArray(String[]::new);
}
return new String[]{null};
}
private static Class convertType(@Nullable Class existing, Class next)
{
if (Number.class.isAssignableFrom(next) || next == String.class) {
if (existing == null) {
return next;
}
// string wins everything
if (existing == String.class) {
return existing;
}
if (next == String.class) {
return next;
}
// all numbers win over Integer
if (existing == Integer.class) {
return next;
}
if (existing == Float.class) {
// doubles win over floats
if (next == Double.class) {
return next;
}
return existing;
}
if (existing == Long.class) {
if (next == Integer.class) {
// long beats int
return existing;
}
// double and float win over longs
return next;
}
// otherwise double
return Double.class;
}
throw new UOE("Invalid array expression type: %s", next);
}
/**
* Coerces {@link ExprEval} value back to selector friendly {@link List} if the evaluated expression result is an
* array type
*/
@Nullable
public static Object coerceEvalToSelectorObject(ExprEval eval)
{
switch (eval.type()) {
case STRING_ARRAY:
return Arrays.stream(eval.asStringArray()).collect(Collectors.toList());
case DOUBLE_ARRAY:
return Arrays.stream(eval.asDoubleArray()).collect(Collectors.toList());
case LONG_ARRAY:
return Arrays.stream(eval.asLongArray()).collect(Collectors.toList());
default:
return eval.value();
}
}
/**
* Returns pair of columns which are definitely multi-valued, or 'actual' arrays, and those which we are unable to
* discern from the {@link ColumnSelectorFactory#getColumnCapabilities(String)}, or 'unknown' arrays.

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.virtual;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseSingleValueDimensionSelector;
import org.apache.druid.segment.ColumnValueSelector;
@ -30,6 +31,7 @@ import org.apache.druid.segment.TestObjectColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class ExpressionColumnValueSelectorTest
@ -130,6 +132,105 @@ public class ExpressionColumnValueSelectorTest
}
@Test
public void testCoerceListToArray()
{
List<Long> longList = ImmutableList.of(1L, 2L, 3L);
Assert.assertArrayEquals(new Long[]{1L, 2L, 3L}, (Long[]) ExpressionSelectors.coerceListToArray(longList));
List<Integer> intList = ImmutableList.of(1, 2, 3);
Assert.assertArrayEquals(new Long[]{1L, 2L, 3L}, (Long[]) ExpressionSelectors.coerceListToArray(intList));
List<Float> floatList = ImmutableList.of(1.0f, 2.0f, 3.0f);
Assert.assertArrayEquals(new Double[]{1.0, 2.0, 3.0}, (Double[]) ExpressionSelectors.coerceListToArray(floatList));
List<Double> doubleList = ImmutableList.of(1.0, 2.0, 3.0);
Assert.assertArrayEquals(new Double[]{1.0, 2.0, 3.0}, (Double[]) ExpressionSelectors.coerceListToArray(doubleList));
List<String> stringList = ImmutableList.of("a", "b", "c");
Assert.assertArrayEquals(new String[]{"a", "b", "c"}, (String[]) ExpressionSelectors.coerceListToArray(stringList));
List<String> withNulls = new ArrayList<>();
withNulls.add("a");
withNulls.add(null);
withNulls.add("c");
Assert.assertArrayEquals(new String[]{"a", null, "c"}, (String[]) ExpressionSelectors.coerceListToArray(withNulls));
List<Long> withNumberNulls = new ArrayList<>();
withNumberNulls.add(1L);
withNumberNulls.add(null);
withNumberNulls.add(3L);
Assert.assertArrayEquals(new Long[]{1L, null, 3L}, (Long[]) ExpressionSelectors.coerceListToArray(withNumberNulls));
List<Object> withStringMix = ImmutableList.of(1L, "b", 3L);
Assert.assertArrayEquals(
new String[]{"1", "b", "3"},
(String[]) ExpressionSelectors.coerceListToArray(withStringMix)
);
List<Number> withIntsAndLongs = ImmutableList.of(1, 2L, 3);
Assert.assertArrayEquals(
new Long[]{1L, 2L, 3L},
(Long[]) ExpressionSelectors.coerceListToArray(withIntsAndLongs)
);
List<Number> withFloatsAndLongs = ImmutableList.of(1, 2L, 3.0f);
Assert.assertArrayEquals(
new Double[]{1.0, 2.0, 3.0},
(Double[]) ExpressionSelectors.coerceListToArray(withFloatsAndLongs)
);
List<Number> withDoublesAndLongs = ImmutableList.of(1, 2L, 3.0);
Assert.assertArrayEquals(
new Double[]{1.0, 2.0, 3.0},
(Double[]) ExpressionSelectors.coerceListToArray(withDoublesAndLongs)
);
List<Number> withFloatsAndDoubles = ImmutableList.of(1L, 2.0f, 3.0);
Assert.assertArrayEquals(
new Double[]{1.0, 2.0, 3.0},
(Double[]) ExpressionSelectors.coerceListToArray(withFloatsAndDoubles)
);
List<String> withAllNulls = new ArrayList<>();
withAllNulls.add(null);
withAllNulls.add(null);
withAllNulls.add(null);
Assert.assertArrayEquals(
new String[]{null, null, null},
(String[]) ExpressionSelectors.coerceListToArray(withAllNulls)
);
}
@Test
public void testCoerceExprToValue()
{
Assert.assertEquals(
ImmutableList.of(1L, 2L, 3L),
ExpressionSelectors.coerceEvalToSelectorObject(ExprEval.ofLongArray(new Long[]{1L, 2L, 3L}))
);
Assert.assertEquals(
ImmutableList.of(1.0, 2.0, 3.0),
ExpressionSelectors.coerceEvalToSelectorObject(ExprEval.ofDoubleArray(new Double[]{1.0, 2.0, 3.0}))
);
Assert.assertEquals(
ImmutableList.of("a", "b", "c"),
ExpressionSelectors.coerceEvalToSelectorObject(ExprEval.ofStringArray(new String[]{"a", "b", "c"}))
);
List<String> withNulls = new ArrayList<>();
withNulls.add("a");
withNulls.add(null);
withNulls.add("c");
Assert.assertEquals(
withNulls,
ExpressionSelectors.coerceEvalToSelectorObject(ExprEval.ofStringArray(new String[]{"a", null, "c"}))
);
}
private static DimensionSelector dimensionSelectorFromSupplier(
final Supplier<String> supplier
)

View File

@ -8535,7 +8535,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{"[\"1\",\"2\"]", ""}
new Object[]{"[1,2]", ""}
)
);
}