mirror of https://github.com/apache/druid.git
nested column support for Parquet and Avro (#13325)
* nested column support for Parquet and Avro * style
This commit is contained in:
parent
1231ce3b75
commit
309cae7b65
|
@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.IAE;
|
|||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.segment.column.NullableTypeStrategy;
|
||||
import org.apache.druid.segment.column.TypeStrategies;
|
||||
import org.apache.druid.segment.column.TypeStrategy;
|
||||
|
@ -167,14 +166,40 @@ public abstract class ExprEval<T>
|
|||
array[i++] = o == null ? null : ExprEval.ofType(ExpressionType.LONG, o).value();
|
||||
}
|
||||
return new NonnullPair<>(ExpressionType.LONG_ARRAY, array);
|
||||
}
|
||||
if (coercedType == Float.class || coercedType == Double.class) {
|
||||
} else if (coercedType == Float.class || coercedType == Double.class) {
|
||||
Object[] array = new Object[val.size()];
|
||||
int i = 0;
|
||||
for (Object o : val) {
|
||||
array[i++] = o == null ? null : ExprEval.ofType(ExpressionType.DOUBLE, o).value();
|
||||
}
|
||||
return new NonnullPair<>(ExpressionType.DOUBLE_ARRAY, array);
|
||||
} else if (coercedType == Object.class) {
|
||||
// object, fall back to "best effort"
|
||||
ExprEval<?>[] evals = new ExprEval[val.size()];
|
||||
Object[] array = new Object[val.size()];
|
||||
int i = 0;
|
||||
ExpressionType elementType = null;
|
||||
for (Object o : val) {
|
||||
if (o != null) {
|
||||
ExprEval<?> eval = ExprEval.bestEffortOf(o);
|
||||
elementType = ExpressionTypeConversion.coerceArrayTypes(elementType, eval.type());
|
||||
evals[i++] = eval;
|
||||
} else {
|
||||
evals[i++] = null;
|
||||
}
|
||||
}
|
||||
i = 0;
|
||||
for (ExprEval<?> eval : evals) {
|
||||
if (eval != null) {
|
||||
array[i++] = eval.castTo(elementType).value();
|
||||
} else {
|
||||
array[i++] = null;
|
||||
}
|
||||
}
|
||||
ExpressionType arrayType = elementType == null
|
||||
? ExpressionType.STRING_ARRAY
|
||||
: ExpressionTypeFactory.getInstance().ofArray(elementType);
|
||||
return new NonnullPair<>(arrayType, array);
|
||||
}
|
||||
// default to string
|
||||
Object[] array = new Object[val.size()];
|
||||
|
@ -194,31 +219,6 @@ public abstract class ExprEval<T>
|
|||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static ExpressionType findArrayType(@Nullable Object[] val)
|
||||
{
|
||||
// if value is not null and has at least 1 element, conversion is unambigous regardless of the selector
|
||||
if (val != null && val.length > 0) {
|
||||
Class<?> coercedType = null;
|
||||
|
||||
for (Object elem : val) {
|
||||
if (elem != null) {
|
||||
coercedType = convertType(coercedType, elem.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
if (coercedType == Long.class || coercedType == Integer.class) {
|
||||
return ExpressionType.LONG_ARRAY;
|
||||
}
|
||||
if (coercedType == Float.class || coercedType == Double.class) {
|
||||
return ExpressionType.DOUBLE_ARRAY;
|
||||
}
|
||||
// default to string
|
||||
return ExpressionType.STRING_ARRAY;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the common type to use between 2 types, useful for choosing the appropriate type for an array given a set
|
||||
* of objects with unknown type, following rules similar to Java, our own native Expr, and SQL implicit type
|
||||
|
@ -266,7 +266,8 @@ public abstract class ExprEval<T>
|
|||
// otherwise double
|
||||
return Double.class;
|
||||
}
|
||||
throw new UOE("Invalid array expression type: %s", next);
|
||||
// its complicated, call it object
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
public static ExprEval of(long longValue)
|
||||
|
|
|
@ -82,7 +82,19 @@ public class ExpressionTypeConversion
|
|||
return type;
|
||||
}
|
||||
if (type.isArray() || other.isArray()) {
|
||||
if (!type.equals(other)) {
|
||||
if (!Objects.equals(type, other)) {
|
||||
throw new IAE("Cannot implicitly cast %s to %s", type, other);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
if (type.is(ExprType.COMPLEX) || other.is(ExprType.COMPLEX)) {
|
||||
if (type.getElementType() == null) {
|
||||
return other;
|
||||
}
|
||||
if (other.getElementType() == null) {
|
||||
return type;
|
||||
}
|
||||
if (!Objects.equals(type, other)) {
|
||||
throw new IAE("Cannot implicitly cast %s to %s", type, other);
|
||||
}
|
||||
return type;
|
||||
|
@ -108,13 +120,25 @@ public class ExpressionTypeConversion
|
|||
public static ExpressionType function(@Nullable ExpressionType type, @Nullable ExpressionType other)
|
||||
{
|
||||
if (type == null) {
|
||||
type = other;
|
||||
return other;
|
||||
}
|
||||
if (other == null) {
|
||||
other = type;
|
||||
return type;
|
||||
}
|
||||
// arrays cannot be auto converted
|
||||
if ((type != null && type.isArray()) || (other != null && other.isArray())) {
|
||||
if (type.isArray() || other.isArray()) {
|
||||
if (!Objects.equals(type, other)) {
|
||||
throw new IAE("Cannot implicitly cast %s to %s", type, other);
|
||||
}
|
||||
return type;
|
||||
}
|
||||
if (type.is(ExprType.COMPLEX) || other.is(ExprType.COMPLEX)) {
|
||||
if (type.getComplexTypeName() == null) {
|
||||
return other;
|
||||
}
|
||||
if (other.getComplexTypeName() == null) {
|
||||
return type;
|
||||
}
|
||||
if (!Objects.equals(type, other)) {
|
||||
throw new IAE("Cannot implicitly cast %s to %s", type, other);
|
||||
}
|
||||
|
@ -128,6 +152,34 @@ public class ExpressionTypeConversion
|
|||
return numeric(type, other);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static ExpressionType coerceArrayTypes(@Nullable ExpressionType type, @Nullable ExpressionType other)
|
||||
{
|
||||
if (type == null) {
|
||||
return other;
|
||||
}
|
||||
if (other == null) {
|
||||
return type;
|
||||
}
|
||||
|
||||
if (Objects.equals(type, other)) {
|
||||
return type;
|
||||
}
|
||||
|
||||
ExpressionType typeArrayType = type.isArray() ? type : ExpressionTypeFactory.getInstance().ofArray(type);
|
||||
ExpressionType otherArrayType = other.isArray() ? other : ExpressionTypeFactory.getInstance().ofArray(other);
|
||||
|
||||
if (typeArrayType.getElementType().isPrimitive() && otherArrayType.getElementType().isPrimitive()) {
|
||||
ExpressionType newElementType = function(
|
||||
(ExpressionType) typeArrayType.getElementType(),
|
||||
(ExpressionType) otherArrayType.getElementType()
|
||||
);
|
||||
return ExpressionTypeFactory.getInstance().ofArray(newElementType);
|
||||
}
|
||||
|
||||
throw new IAE("Cannot implicitly cast %s to %s", type, other);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Given 2 'input' types, choose the most appropriate combined type, if possible
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.math.expr;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.NonnullPair;
|
||||
|
@ -36,6 +37,7 @@ import org.junit.rules.ExpectedException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExprEvalTest extends InitializedNullHandlingTest
|
||||
{
|
||||
|
@ -330,6 +332,70 @@ public class ExprEvalTest extends InitializedNullHandlingTest
|
|||
coerced.rhs
|
||||
);
|
||||
|
||||
List<List<String>> nestedLists = ImmutableList.of(
|
||||
ImmutableList.of("a", "b", "c"),
|
||||
ImmutableList.of("d", "e", "f")
|
||||
);
|
||||
coerced = ExprEval.coerceListToArray(nestedLists, false);
|
||||
Assert.assertEquals(ExpressionTypeFactory.getInstance().ofArray(ExpressionType.STRING_ARRAY), coerced.lhs);
|
||||
Assert.assertArrayEquals(
|
||||
new Object[]{new Object[]{"a", "b", "c"}, new Object[]{"d", "e", "f"}},
|
||||
coerced.rhs
|
||||
);
|
||||
|
||||
Map<String, Object> unknown1 = ImmutableMap.of("x", 1L, "y", 2L);
|
||||
Map<String, Object> unknown2 = ImmutableMap.of("x", 4L, "y", 5L);
|
||||
List<Map<String, Object>> listUnknownComplex = ImmutableList.of(unknown1, unknown2);
|
||||
coerced = ExprEval.coerceListToArray(listUnknownComplex, false);
|
||||
Assert.assertEquals(ExpressionTypeFactory.getInstance().ofArray(ExpressionType.UNKNOWN_COMPLEX), coerced.lhs);
|
||||
Assert.assertArrayEquals(
|
||||
new Object[]{unknown1, unknown2},
|
||||
coerced.rhs
|
||||
);
|
||||
|
||||
|
||||
Map<String, Object> unknown3 = ImmutableMap.of("x", 5L, "y", 7L);
|
||||
Map<String, Object> unknown4 = ImmutableMap.of("x", 6L, "y", 8L);
|
||||
|
||||
List<List<Map<String, Object>>> nestedListsComplex = ImmutableList.of(
|
||||
ImmutableList.of(unknown1, unknown2),
|
||||
ImmutableList.of(unknown3, unknown4)
|
||||
);
|
||||
coerced = ExprEval.coerceListToArray(nestedListsComplex, false);
|
||||
Assert.assertEquals(
|
||||
ExpressionTypeFactory.getInstance().ofArray(
|
||||
ExpressionTypeFactory.getInstance().ofArray(ExpressionType.UNKNOWN_COMPLEX)
|
||||
),
|
||||
coerced.lhs
|
||||
);
|
||||
Assert.assertArrayEquals(
|
||||
new Object[]{
|
||||
new Object[]{unknown1, unknown2},
|
||||
new Object[]{unknown3, unknown4}
|
||||
},
|
||||
coerced.rhs
|
||||
);
|
||||
|
||||
List<List<Object>> mixed = ImmutableList.of(
|
||||
ImmutableList.of("a", "b", "c"),
|
||||
ImmutableList.of(1L, 2L, 3L),
|
||||
ImmutableList.of(3.0, 4.0, 5.0),
|
||||
ImmutableList.of("a", 2L, 3.0)
|
||||
);
|
||||
coerced = ExprEval.coerceListToArray(mixed, false);
|
||||
Assert.assertEquals(
|
||||
ExpressionTypeFactory.getInstance().ofArray(ExpressionType.STRING_ARRAY),
|
||||
coerced.lhs
|
||||
);
|
||||
Assert.assertArrayEquals(
|
||||
new Object[]{
|
||||
new Object[]{"a", "b", "c"},
|
||||
new Object[]{"1", "2", "3"},
|
||||
new Object[]{"3.0", "4.0", "5.0"},
|
||||
new Object[]{"a", "2", "3.0"}
|
||||
},
|
||||
coerced.rhs
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -254,12 +254,26 @@
|
|||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-core</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
|
|||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
|
@ -34,7 +35,9 @@ import org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder;
|
|||
import org.apache.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder;
|
||||
import org.apache.druid.data.input.impl.ByteEntity;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
|
||||
import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
|
||||
|
@ -42,6 +45,13 @@ import org.apache.druid.jackson.DefaultObjectMapper;
|
|||
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
|
||||
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
|
||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||
import org.apache.druid.query.expression.TestExprMacroTable;
|
||||
import org.apache.druid.segment.NestedDataDimensionSchema;
|
||||
import org.apache.druid.segment.nested.StructuredData;
|
||||
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||
import org.apache.druid.segment.transform.TransformSpec;
|
||||
import org.apache.druid.segment.transform.TransformingInputEntityReader;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -62,7 +72,32 @@ import java.util.List;
|
|||
import static org.apache.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect;
|
||||
import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum;
|
||||
|
||||
public class AvroStreamInputFormatTest
|
||||
/**
|
||||
* test data row:
|
||||
* {
|
||||
* "timestamp": 1445801400000,
|
||||
* "eventType": "type-a",
|
||||
* "id": 1976491,
|
||||
* "someOtherId": 6568719896,
|
||||
* "isValid": true,
|
||||
* "someIntArray": [1, 2, 4, 8],
|
||||
* "someStringArray": ["8", "4", "2", "1"],
|
||||
* "someIntValueMap": {"8": 8, "1": 1, "2": 2, "4": 4},
|
||||
* "someStringValueMap": {"8": "8", "1": "1", "2": "2", "4": "4"},
|
||||
* "someUnion": "string as union",
|
||||
* "someMultiMemberUnion": 1,
|
||||
* "someNull": null,
|
||||
* "someFixed": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
* "someBytes": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000",
|
||||
* "someEnum": "ENUM1",
|
||||
* "someRecord": {"subInt": 4892, "subLong": 1543698},
|
||||
* "someLong": 679865987569912369,
|
||||
* "someInt": 1,
|
||||
* "someFloat": 0.23555,
|
||||
* "someRecordArray": [{"nestedString": "string in record"}]
|
||||
* }
|
||||
*/
|
||||
public class AvroStreamInputFormatTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final String EVENT_TYPE = "eventType";
|
||||
private static final String ID = "id";
|
||||
|
@ -209,6 +244,134 @@ public class AvroStreamInputFormatTest
|
|||
assertInputRowCorrect(inputRow, DIMENSIONS, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTransformNested() throws SchemaValidationException, IOException
|
||||
{
|
||||
Repository repository = new InMemoryRepository(null);
|
||||
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
|
||||
flattenSpec,
|
||||
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
|
||||
false,
|
||||
false
|
||||
);
|
||||
NestedInputFormat inputFormat2 = jsonMapper.readValue(
|
||||
jsonMapper.writeValueAsString(inputFormat),
|
||||
NestedInputFormat.class
|
||||
);
|
||||
repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputFormat) inputFormat2).getAvroBytesDecoder()).getSchemaRepository();
|
||||
|
||||
// prepare data
|
||||
GenericRecord someAvroDatum = buildSomeAvroDatum();
|
||||
|
||||
// encode schema id
|
||||
Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
|
||||
TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
|
||||
repository,
|
||||
new IntegerConverter(),
|
||||
new AvroSchemaConverter(),
|
||||
new IdentityConverter()
|
||||
);
|
||||
Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
|
||||
converter.putSubjectAndId(id, byteBuffer);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
out.write(byteBuffer.array());
|
||||
// encode data
|
||||
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
|
||||
// write avro datum to bytes
|
||||
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
|
||||
|
||||
final ByteEntity entity = new ByteEntity(ByteBuffer.wrap(out.toByteArray()));
|
||||
|
||||
DimensionsSpec dimensionsSpec = new DimensionsSpec(
|
||||
ImmutableList.of(
|
||||
new NestedDataDimensionSchema("someIntValueMap"),
|
||||
new NestedDataDimensionSchema("someStringValueMap"),
|
||||
new NestedDataDimensionSchema("someRecord"),
|
||||
new NestedDataDimensionSchema("someRecordArray"),
|
||||
new LongDimensionSchema("tSomeIntValueMap8"),
|
||||
new LongDimensionSchema("tSomeIntValueMap8_2"),
|
||||
new StringDimensionSchema("tSomeStringValueMap8"),
|
||||
new LongDimensionSchema("tSomeRecordSubLong"),
|
||||
new NestedDataDimensionSchema("tSomeRecordArray0"),
|
||||
new StringDimensionSchema("tSomeRecordArray0nestedString")
|
||||
)
|
||||
);
|
||||
InputEntityReader reader = inputFormat2.createReader(
|
||||
new InputRowSchema(timestampSpec, dimensionsSpec, null),
|
||||
entity,
|
||||
null
|
||||
);
|
||||
TransformSpec transformSpec = new TransformSpec(
|
||||
null,
|
||||
ImmutableList.of(
|
||||
new ExpressionTransform(
|
||||
"tSomeIntValueMap8",
|
||||
"json_value(someIntValueMap, '$.8')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
),
|
||||
new ExpressionTransform(
|
||||
"tSomeIntValueMap8_2",
|
||||
"json_value(json_query(someIntValueMap, '$'), '$.8')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
),
|
||||
new ExpressionTransform(
|
||||
"tSomeStringValueMap8",
|
||||
"json_value(someStringValueMap, '$.8')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
),
|
||||
new ExpressionTransform(
|
||||
"tSomeRecordSubLong",
|
||||
"json_value(someRecord, '$.subLong')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
),
|
||||
new ExpressionTransform(
|
||||
"tSomeRecordArray0",
|
||||
"json_query(someRecordArray, '$[0]')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
),
|
||||
new ExpressionTransform(
|
||||
"tSomeRecordArray0nestedString",
|
||||
"json_value(someRecordArray, '$[0].nestedString')",
|
||||
TestExprMacroTable.INSTANCE
|
||||
)
|
||||
)
|
||||
);
|
||||
TransformingInputEntityReader transformingReader = new TransformingInputEntityReader(
|
||||
reader,
|
||||
transformSpec.toTransformer()
|
||||
);
|
||||
InputRow inputRow = transformingReader.read().next();
|
||||
|
||||
Assert.assertEquals(1543698L, inputRow.getTimestampFromEpoch());
|
||||
Assert.assertEquals(
|
||||
AvroStreamInputRowParserTest.SOME_INT_VALUE_MAP_VALUE,
|
||||
StructuredData.unwrap(inputRow.getRaw("someIntValueMap"))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
AvroStreamInputRowParserTest.SOME_STRING_VALUE_MAP_VALUE,
|
||||
StructuredData.unwrap(inputRow.getRaw("someStringValueMap"))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of("subInt", 4892, "subLong", 1543698L),
|
||||
StructuredData.unwrap(inputRow.getRaw("someRecord"))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(ImmutableMap.of("nestedString", "string in record")),
|
||||
StructuredData.unwrap(inputRow.getRaw("someRecordArray"))
|
||||
);
|
||||
|
||||
Assert.assertEquals(8L, inputRow.getRaw("tSomeIntValueMap8"));
|
||||
Assert.assertEquals(8L, inputRow.getRaw("tSomeIntValueMap8_2"));
|
||||
Assert.assertEquals("8", inputRow.getRaw("tSomeStringValueMap8"));
|
||||
Assert.assertEquals(1543698L, inputRow.getRaw("tSomeRecordSubLong"));
|
||||
Assert.assertEquals(
|
||||
ImmutableMap.of("nestedString", "string in record"),
|
||||
StructuredData.unwrap(inputRow.getRaw("tSomeRecordArray0"))
|
||||
);
|
||||
Assert.assertEquals("string in record", inputRow.getRaw("tSomeRecordArray0nestedString"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSchemaless() throws SchemaValidationException, IOException
|
||||
{
|
||||
|
|
|
@ -130,7 +130,7 @@ public class AvroStreamInputRowParserTest
|
|||
.build();
|
||||
private static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1");
|
||||
private static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
|
||||
private static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
|
||||
static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
|
||||
new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
|
||||
{
|
||||
@Nonnull
|
||||
|
@ -141,7 +141,7 @@ public class AvroStreamInputRowParserTest
|
|||
}
|
||||
}
|
||||
);
|
||||
private static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
|
||||
static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
|
||||
new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
|
||||
{
|
||||
@Nonnull
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.parquet;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.data.input.ColumnsFilter;
|
||||
import org.apache.druid.data.input.InputEntityReader;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.InputRowSchema;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||
import org.apache.druid.query.expression.TestExprMacroTable;
|
||||
import org.apache.druid.segment.NestedDataDimensionSchema;
|
||||
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||
import org.apache.druid.segment.transform.TransformSpec;
|
||||
import org.apache.druid.segment.transform.TransformingInputEntityReader;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class NestedColumnParquetReaderTest extends BaseParquetReaderTest
|
||||
{
|
||||
@Test
|
||||
public void testNestedColumnTransformsNestedTestFile() throws IOException
|
||||
{
|
||||
final String file = "example/flattening/test_nested_1.parquet";
|
||||
InputRowSchema schema = new InputRowSchema(
|
||||
new TimestampSpec("timestamp", "auto", null),
|
||||
new DimensionsSpec(
|
||||
ImmutableList.of(
|
||||
new NestedDataDimensionSchema("nestedData"),
|
||||
new NestedDataDimensionSchema("t_nestedData_listDim"),
|
||||
new StringDimensionSchema("t_nestedData_listDim_string"),
|
||||
new StringDimensionSchema("t_nestedData_dim2"),
|
||||
new LongDimensionSchema("t_nestedData_dim3"),
|
||||
new LongDimensionSchema("t_nestedData_metric2"),
|
||||
new StringDimensionSchema("t_nestedData_listDim1"),
|
||||
new StringDimensionSchema("t_nestedData_listDim2")
|
||||
)
|
||||
),
|
||||
ColumnsFilter.all()
|
||||
);
|
||||
JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
|
||||
InputEntityReader reader = createReader(
|
||||
file,
|
||||
schema,
|
||||
flattenSpec
|
||||
);
|
||||
TransformSpec transformSpec = new TransformSpec(
|
||||
null,
|
||||
ImmutableList.of(
|
||||
new ExpressionTransform("t_nestedData_dim2", "json_value(nestedData, '$.dim2')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_dim3", "json_value(nestedData, '$.dim3')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_metric2", "json_value(nestedData, '$.metric2')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_listDim", "json_query(nestedData, '$.listDim')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_listDim_string", "json_query(nestedData, '$.listDim')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_listDim_1", "json_value(nestedData, '$.listDim[0]')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_nestedData_listDim_2", "json_query(nestedData, '$.listDim[1]')", TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
);
|
||||
TransformingInputEntityReader transformingReader = new TransformingInputEntityReader(
|
||||
reader,
|
||||
transformSpec.toTransformer()
|
||||
);
|
||||
|
||||
List<InputRow> rows = readAllRows(transformingReader);
|
||||
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
|
||||
Assert.assertEquals(1L, rows.get(0).getRaw("t_nestedData_dim3"));
|
||||
Assert.assertEquals("d2v1", rows.get(0).getRaw("t_nestedData_dim2"));
|
||||
Assert.assertEquals(ImmutableList.of("listDim1v1", "listDim1v2"), rows.get(0).getRaw("t_nestedData_listDim"));
|
||||
Assert.assertEquals(ImmutableList.of("listDim1v1", "listDim1v2"), rows.get(0).getDimension("t_nestedData_listDim_string"));
|
||||
Assert.assertEquals("listDim1v1", rows.get(0).getRaw("t_nestedData_listDim_1"));
|
||||
Assert.assertEquals("listDim1v2", rows.get(0).getRaw("t_nestedData_listDim_2"));
|
||||
Assert.assertEquals(2L, rows.get(0).getRaw("t_nestedData_metric2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedColumnTransformsNestedNullableListFile() throws IOException
|
||||
{
|
||||
final String file = "example/flattening/nullable_list.snappy.parquet";
|
||||
InputRowSchema schema = new InputRowSchema(
|
||||
new TimestampSpec("timestamp", "auto", null),
|
||||
new DimensionsSpec(
|
||||
ImmutableList.of(
|
||||
new NestedDataDimensionSchema("a1"),
|
||||
new NestedDataDimensionSchema("a2"),
|
||||
new NestedDataDimensionSchema("t_a2"),
|
||||
new NestedDataDimensionSchema("t_a1_b1"),
|
||||
new LongDimensionSchema("t_a1_b1_c1"),
|
||||
new LongDimensionSchema("t_e2_0_b1"),
|
||||
new LongDimensionSchema("tt_a2_0_b1")
|
||||
)
|
||||
),
|
||||
ColumnsFilter.all()
|
||||
);
|
||||
JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
|
||||
InputEntityReader reader = createReader(
|
||||
file,
|
||||
schema,
|
||||
flattenSpec
|
||||
);
|
||||
TransformSpec transformSpec = new TransformSpec(
|
||||
null,
|
||||
ImmutableList.of(
|
||||
new ExpressionTransform("t_a1_b1", "json_query(a1, '$.b1')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_a2", "json_query(a2, '$')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_a1_b1_c1", "json_value(a1, '$.b1.c1')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_a2_0_b1", "json_value(a2, '$[0].b1')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_a2_0", "json_query(a2, '$[0]')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("t_a2_1_b1", "json_value(a2, '$[1].b1')", TestExprMacroTable.INSTANCE),
|
||||
new ExpressionTransform("tt_a2_0_b1", "json_value(json_query(a2, '$'), '$[0].b1')", TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
);
|
||||
TransformingInputEntityReader transformingReader = new TransformingInputEntityReader(
|
||||
reader,
|
||||
transformSpec.toTransformer()
|
||||
);
|
||||
|
||||
List<InputRow> rows = readAllRows(transformingReader);
|
||||
|
||||
Assert.assertEquals("2022-02-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ImmutableMap.of("b1", 1L, "b2", 2L), ImmutableMap.of("b1", 1L, "b2", 2L)
|
||||
),
|
||||
rows.get(0).getRaw("a2")
|
||||
);
|
||||
// expression turns List into Object[]
|
||||
Assert.assertArrayEquals(
|
||||
new Object[]{
|
||||
ImmutableMap.of("b1", 1L, "b2", 2L), ImmutableMap.of("b1", 1L, "b2", 2L)
|
||||
},
|
||||
(Object[]) rows.get(0).getRaw("t_a2")
|
||||
);
|
||||
Assert.assertEquals(ImmutableMap.of("c1", 1L, "c2", 2L), rows.get(0).getRaw("t_a1_b1"));
|
||||
Assert.assertEquals(ImmutableMap.of("b1", 1L, "b2", 2L), rows.get(0).getRaw("t_a2_0"));
|
||||
Assert.assertEquals(1L, rows.get(0).getRaw("t_a1_b1_c1"));
|
||||
Assert.assertEquals(1L, rows.get(0).getRaw("t_a2_0_b1"));
|
||||
Assert.assertEquals(1L, rows.get(0).getRaw("t_a2_1_b1"));
|
||||
Assert.assertEquals(1L, rows.get(0).getRaw("tt_a2_0_b1"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue