fix issue with parquet list conversion of nullable lists with complex nullable elements (#13294)

* fix issue with parquet list conversion of nullable lists with complex nullable elements

* pom stuff

* fix style

* adjustments
This commit is contained in:
Clint Wylie 2022-11-04 05:25:42 -07:00 committed by GitHub
parent 848570d8db
commit e60e305ddb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 232 additions and 46 deletions

View File

@ -250,14 +250,14 @@ public class ObjectFlatteners
*/
default Map<String, Object> toMap(T obj)
{
return (Map<String, Object>) toMapHelper(obj);
return (Map<String, Object>) toPlainJavaType(obj);
}
/**
* Recursively traverse "json" object using a {@link JsonProvider}, converting to Java {@link Map} and {@link List},
* potentially transforming via {@link #finalizeConversionForMap} as we go
*/
default Object toMapHelper(Object o)
default Object toPlainJavaType(Object o)
{
final JsonProvider jsonProvider = getJsonProvider();
if (jsonProvider.isMap(o)) {
@ -267,7 +267,7 @@ public class ObjectFlatteners
if (field == null) {
actualMap.put(key, null);
} else if (jsonProvider.isMap(field) || jsonProvider.isArray(field)) {
actualMap.put(key, toMapHelper(finalizeConversionForMap(field)));
actualMap.put(key, toPlainJavaType(finalizeConversionForMap(field)));
} else {
actualMap.put(key, finalizeConversionForMap(field));
}
@ -279,7 +279,7 @@ public class ObjectFlatteners
for (int i = 0; i < length; i++) {
Object element = jsonProvider.getArrayIndex(o, i);
if (jsonProvider.isMap(element) || jsonProvider.isArray(element)) {
actualList.add(toMapHelper(finalizeConversionForMap(element)));
actualList.add(toPlainJavaType(finalizeConversionForMap(element)));
} else {
actualList.add(finalizeConversionForMap(element));
}

View File

@ -0,0 +1,99 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "%s",
"paths": "example/flattening/nullable_list.snappy.parquet"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user": "druid",
"password": "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test1",
"parser": {
"type": "%s",
"parseSpec": {
"format": "%s",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "a1_b1_c1",
"expr": "$.a1.b1.c1"
},
{
"type": "path",
"name": "a1_b1_c2",
"expr": "$.a1.b1.c2"
},
{
"type": "path",
"name": "a2_0_b1",
"expr": "$.a2[0].b1"
},
{
"type": "path",
"name": "a2_0_b2",
"expr": "$.a2[0].b2"
},
{
"type": "path",
"name": "a2_1_b1",
"expr": "$.a2[1].b1"
},
{
"type": "path",
"name": "a2_1_b2",
"expr": "$.a2[1].b2"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": []
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties": {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -165,6 +165,20 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<profiles>
<profile>

View File

@ -162,6 +162,16 @@ class ParquetGroupConverter
required binary str (UTF8);
};
}
// List<Tuple<Long, Long>> (nullable list; nullable object elements; nullable fields)
optional group my_list (LIST) {
repeated group list {
optional group element {
optional int64 b1;
optional int64 b2;
}
}
}
*/
assert isLogicalListType(g.getType());
int repeated = g.getFieldRepetitionCount(0);
@ -173,12 +183,38 @@ class ParquetGroupConverter
vals.add(convertPrimitiveField(g, 0, i, binaryAsString));
} else {
Group listItem = g.getGroup(0, i);
vals.add(listItem);
vals.add(convertListElement(listItem, binaryAsString));
}
}
return vals;
}
private static Object convertListElement(Group listItem, boolean binaryAsString)
{
if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
listItem.getType().getFieldCount() == 1 &&
!listItem.getType().isPrimitive() &&
listItem.getType().getFields().get(0).isPrimitive()
) {
// nullable primitive list elements can have a repeating wrapper element, peel it off
return convertPrimitiveField(listItem, 0, binaryAsString);
} else if (
listItem.getType().isRepetition(Type.Repetition.REPEATED) &&
listItem.getType().getFieldCount() == 1 &&
listItem.getFieldRepetitionCount(0) == 1 &&
listItem.getType().getName().equalsIgnoreCase("list") &&
listItem.getType().getFieldName(0).equalsIgnoreCase("element") &&
listItem.getGroup(0, 0).getType().isRepetition(Type.Repetition.OPTIONAL)
) {
// nullable list elements can be represented as a repeated wrapping an optional
return listItem.getGroup(0, 0);
} else {
// else just pass it through
return listItem;
}
}
/**
* check if a parquet type is a valid 'map' type
*/
@ -239,7 +275,7 @@ class ParquetGroupConverter
}
/**
* Convert a primitive group field to a "ingestion friendly" java object
* Convert a primitive group field to an "ingestion friendly" java object
*
* @return "ingestion ready" java object, or null
*/
@ -326,8 +362,8 @@ class ParquetGroupConverter
// todo: idk wtd about unsigned
case UINT_8:
case UINT_16:
case UINT_32:
return g.getInteger(fieldIndex, index);
case UINT_32:
case UINT_64:
return g.getLong(fieldIndex, index);
case DECIMAL:
@ -408,7 +444,7 @@ class ParquetGroupConverter
/**
* convert deprecated parquet int96 nanosecond timestamp to a long, based on
* https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTimestampUtils.java#L56
* https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTimestampUtils.java#L44
*/
private static long convertInt96BinaryToTimestamp(Binary value)
{
@ -454,19 +490,6 @@ class ParquetGroupConverter
}
}
static boolean isWrappedListPrimitive(Object o)
{
if (o instanceof Group) {
Group g = (Group) o;
return g.getType().isRepetition(Type.Repetition.REPEATED) &&
!g.getType().isPrimitive() &&
g.getType().asGroupType().getFieldCount() == 1 &&
g.getType().getFields().get(0).isPrimitive();
}
return false;
}
private final boolean binaryAsString;
ParquetGroupConverter(boolean binaryAsString)
@ -487,15 +510,12 @@ class ParquetGroupConverter
return convertField(g, fieldName, binaryAsString);
}
/**
* Properly formed parquet lists when passed through {@link ParquetGroupConverter#convertField(Group, String)} can
* return lists which contain 'wrapped' primitives, that are a {@link Group} with a single, primitive field (see
* {@link ParquetGroupConverter#isWrappedListPrimitive(Object)})
*/
Object unwrapListPrimitive(Object o)
Object unwrapListElement(Object o)
{
assert isWrappedListPrimitive(o);
Group g = (Group) o;
return convertPrimitiveField(g, 0, binaryAsString);
if (o instanceof Group) {
Group g = (Group) o;
return convertListElement(g, binaryAsString);
}
return o;
}
}

View File

@ -31,7 +31,6 @@ import org.apache.parquet.schema.Type;
import javax.annotation.Nullable;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -111,24 +110,19 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak
}
/**
* After json conversion, wrapped list items can still need unwrapped. See
* {@link ParquetGroupConverter#isWrappedListPrimitive(Object)} and
* {@link ParquetGroupConverter#unwrapListPrimitive(Object)} for more details.
*
* @param o
*
* @return
* {@link ParquetGroupConverter} lazily/non-recursively translates {@link Group} into plain java objects, which means
* garbage that downstream druid cannot understand can be left behind in list and map types, so we deal with it here.
*/
private Object finalizeConversion(Object o)
{
// conversion can leave 'wrapped' list primitives
if (ParquetGroupConverter.isWrappedListPrimitive(o)) {
return converter.unwrapListPrimitive(o);
} else if (o instanceof List) {
List<Object> asList = ((List<?>) o).stream().filter(Objects::nonNull).collect(Collectors.toList());
if (asList.stream().allMatch(ParquetGroupConverter::isWrappedListPrimitive)) {
return asList.stream().map(Group.class::cast).map(converter::unwrapListPrimitive).collect(Collectors.toList());
}
if (o instanceof List) {
return ((List<?>) o).stream()
.map(converter::unwrapListElement)
.map(this::toPlainJavaType)
.collect(Collectors.toList());
} else if (o instanceof Group) {
return toPlainJavaType(o);
}
return o;
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.hadoop.conf.Configuration;
import java.io.File;
@ -35,7 +36,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class BaseParquetReaderTest
class BaseParquetReaderTest extends InitializedNullHandlingTest
{
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();

View File

@ -224,4 +224,25 @@ public class FlattenSpecParquetInputTest extends BaseParquetInputTest
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listextracted").get(0));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
}
@Test
public void testFlattenNullableListNullableElement() throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = transformHadoopDruidIndexerConfig(
"example/flattening/nullable_list_flatten.json",
parserType,
true
);
config.intoConfiguration(job);
Object data = getFirstRow(job, parserType, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
Assert.assertEquals("2022-02-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals(1L, rows.get(0).getRaw("a1_b1_c1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a1_b1_c2"));
Assert.assertEquals(1L, rows.get(0).getRaw("a2_0_b1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a2_0_b2"));
Assert.assertEquals(1L, rows.get(0).getRaw("a2_1_b1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a2_1_b2"));
}
}

View File

@ -362,4 +362,41 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testFlattenNullableListNullableElements() throws IOException
{
final String file = "example/flattening/nullable_list.snappy.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
ImmutableList.of()
),
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a1_b1_c1", "$.a1.b1.c1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a1_b1_c2", "$.a1.b1.c2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a2_0_b1", "$.a2[0].b1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a2_0_b2", "$.a2[0].b2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a2_1_b1", "$.a2[1].b1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "a2_1_b2", "$.a2[1].b2")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2022-02-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals(1L, rows.get(0).getRaw("a1_b1_c1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a1_b1_c2"));
Assert.assertEquals(1L, rows.get(0).getRaw("a2_0_b1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a2_0_b2"));
Assert.assertEquals(1L, rows.get(0).getRaw("a2_1_b1"));
Assert.assertEquals(2L, rows.get(0).getRaw("a2_1_b2"));
}
}