mirror of https://github.com/apache/nifi.git
NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro reader
Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
parent
921403ff55
commit
022b64be08
|
@ -894,13 +894,23 @@ public class AvroTypeUtil {
|
||||||
case STRING:
|
case STRING:
|
||||||
return value.toString();
|
return value.toString();
|
||||||
case ARRAY:
|
case ARRAY:
|
||||||
final GenericData.Array<?> array = (GenericData.Array<?>) value;
|
if (value instanceof List) {
|
||||||
final Object[] valueArray = new Object[array.size()];
|
final List<?> list = (List<?>) value;
|
||||||
for (int i = 0; i < array.size(); i++) {
|
final Object[] valueArray = new Object[list.size()];
|
||||||
final Schema elementSchema = avroSchema.getElementType();
|
for (int i = 0; i < list.size(); i++) {
|
||||||
valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
|
final Schema elementSchema = avroSchema.getElementType();
|
||||||
|
valueArray[i] = normalizeValue(list.get(i), elementSchema, fieldName + "[" + i + "]");
|
||||||
|
}
|
||||||
|
return valueArray;
|
||||||
|
} else {
|
||||||
|
final GenericData.Array<?> array = (GenericData.Array<?>) value;
|
||||||
|
final Object[] valueArray = new Object[array.size()];
|
||||||
|
for (int i = 0; i < array.size(); i++) {
|
||||||
|
final Schema elementSchema = avroSchema.getElementType();
|
||||||
|
valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
|
||||||
|
}
|
||||||
|
return valueArray;
|
||||||
}
|
}
|
||||||
return valueArray;
|
|
||||||
case MAP:
|
case MAP:
|
||||||
final Map<?, ?> avroMap = (Map<?, ?>) value;
|
final Map<?, ?> avroMap = (Map<?, ?>) value;
|
||||||
final Map<String, Object> map = new HashMap<>(avroMap.size());
|
final Map<String, Object> map = new HashMap<>(avroMap.size());
|
||||||
|
|
|
@ -101,6 +101,8 @@
|
||||||
<configuration>
|
<configuration>
|
||||||
<excludes combine.children="append">
|
<excludes combine.children="append">
|
||||||
<exclude>src/test/resources/avro/user.avsc</exclude>
|
<exclude>src/test/resources/avro/user.avsc</exclude>
|
||||||
|
<exclude>src/test/resources/avro/user-with-array.avsc</exclude>
|
||||||
|
<exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -62,6 +62,8 @@ public class FetchParquetTest {
|
||||||
static final String RECORD_HEADER = "name,favorite_number,favorite_color";
|
static final String RECORD_HEADER = "name,favorite_number,favorite_color";
|
||||||
|
|
||||||
private Schema schema;
|
private Schema schema;
|
||||||
|
private Schema schemaWithArray;
|
||||||
|
private Schema schemaWithNullableArray;
|
||||||
private Configuration testConf;
|
private Configuration testConf;
|
||||||
private FetchParquet proc;
|
private FetchParquet proc;
|
||||||
private TestRunner testRunner;
|
private TestRunner testRunner;
|
||||||
|
@ -71,6 +73,12 @@ public class FetchParquetTest {
|
||||||
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
|
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
|
||||||
schema = new Schema.Parser().parse(avroSchema);
|
schema = new Schema.Parser().parse(avroSchema);
|
||||||
|
|
||||||
|
final String avroSchemaWithArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-array.avsc"), StandardCharsets.UTF_8);
|
||||||
|
schemaWithArray = new Schema.Parser().parse(avroSchemaWithArray);
|
||||||
|
|
||||||
|
final String avroSchemaWithNullableArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-nullable-array.avsc"), StandardCharsets.UTF_8);
|
||||||
|
schemaWithNullableArray = new Schema.Parser().parse(avroSchemaWithNullableArray);
|
||||||
|
|
||||||
testConf = new Configuration();
|
testConf = new Configuration();
|
||||||
testConf.addResource(new Path(TEST_CONF_PATH));
|
testConf.addResource(new Path(TEST_CONF_PATH));
|
||||||
|
|
||||||
|
@ -243,6 +251,42 @@ public class FetchParquetTest {
|
||||||
flowFile.assertContentEquals("TRIGGER");
|
flowFile.assertContentEquals("TRIGGER");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchWithArray() throws InitializationException, IOException {
|
||||||
|
configure(proc);
|
||||||
|
|
||||||
|
final File parquetDir = new File(DIRECTORY);
|
||||||
|
final File parquetFile = new File(parquetDir,"testFetchParquetWithArrayToCSV.parquet");
|
||||||
|
final int numUsers = 10;
|
||||||
|
writeParquetUsersWithArray(parquetFile, numUsers);
|
||||||
|
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
|
||||||
|
|
||||||
|
testRunner.enqueue("TRIGGER", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchWithNullableArray() throws InitializationException, IOException {
|
||||||
|
configure(proc);
|
||||||
|
|
||||||
|
final File parquetDir = new File(DIRECTORY);
|
||||||
|
final File parquetFile = new File(parquetDir,"testFetchParquetWithNullableArrayToCSV.parquet");
|
||||||
|
final int numUsers = 10;
|
||||||
|
writeParquetUsersWithNullableArray(parquetFile, numUsers);
|
||||||
|
|
||||||
|
final Map<String,String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
|
||||||
|
|
||||||
|
testRunner.enqueue("TRIGGER", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
protected void verifyCSVRecords(int numUsers, String csvContent) {
|
protected void verifyCSVRecords(int numUsers, String csvContent) {
|
||||||
final String[] splits = csvContent.split("[\\n]");
|
final String[] splits = csvContent.split("[\\n]");
|
||||||
Assert.assertEquals(numUsers, splits.length);
|
Assert.assertEquals(numUsers, splits.length);
|
||||||
|
@ -278,4 +322,70 @@ public class FetchParquetTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeParquetUsersWithArray(final File parquetFile, int numUsers) throws IOException {
|
||||||
|
if (parquetFile.exists()) {
|
||||||
|
Assert.assertTrue(parquetFile.delete());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Path parquetPath = new Path(parquetFile.getPath());
|
||||||
|
|
||||||
|
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
|
||||||
|
.<GenericRecord>builder(parquetPath)
|
||||||
|
.withSchema(schemaWithArray)
|
||||||
|
.withConf(testConf);
|
||||||
|
|
||||||
|
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
|
||||||
|
|
||||||
|
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||||
|
for (int i=0; i < numUsers; i++) {
|
||||||
|
final GenericRecord user = new GenericData.Record(schema);
|
||||||
|
user.put("name", "Bob" + i);
|
||||||
|
user.put("favorite_number", i);
|
||||||
|
|
||||||
|
|
||||||
|
final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
|
||||||
|
colors.add("blue" + i);
|
||||||
|
|
||||||
|
user.put("favorite_color", colors);
|
||||||
|
|
||||||
|
writer.write(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeParquetUsersWithNullableArray(final File parquetFile, int numUsers) throws IOException {
|
||||||
|
if (parquetFile.exists()) {
|
||||||
|
Assert.assertTrue(parquetFile.delete());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Path parquetPath = new Path(parquetFile.getPath());
|
||||||
|
|
||||||
|
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
|
||||||
|
.<GenericRecord>builder(parquetPath)
|
||||||
|
.withSchema(schemaWithNullableArray)
|
||||||
|
.withConf(testConf);
|
||||||
|
|
||||||
|
// use the schemaWithArray here just to get the schema for the array part of the favorite_colors fields, the overall
|
||||||
|
// schemaWithNullableArray has a union of the array schema and null
|
||||||
|
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
|
||||||
|
|
||||||
|
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||||
|
for (int i=0; i < numUsers; i++) {
|
||||||
|
final GenericRecord user = new GenericData.Record(schema);
|
||||||
|
user.put("name", "Bob" + i);
|
||||||
|
user.put("favorite_number", i);
|
||||||
|
|
||||||
|
|
||||||
|
final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
|
||||||
|
colors.add("blue" + i);
|
||||||
|
|
||||||
|
user.put("favorite_color", colors);
|
||||||
|
|
||||||
|
writer.write(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"namespace": "example.avro",
|
||||||
|
"type": "record",
|
||||||
|
"name": "User",
|
||||||
|
"fields": [
|
||||||
|
{"name": "name", "type": "string"},
|
||||||
|
{"name": "favorite_number", "type": ["int", "null"]},
|
||||||
|
{"name": "favorite_colors", "type": { "type": "array", "items": ["string","null"] }, "default": null }
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
{"namespace": "example.avro",
|
||||||
|
"type": "record",
|
||||||
|
"name": "User",
|
||||||
|
"fields": [
|
||||||
|
{"name": "name", "type": "string"},
|
||||||
|
{"name": "favorite_number", "type": ["int", "null"]},
|
||||||
|
{"name": "favorite_colors", "type": [ "null", { "type": "array", "items": ["string","null"] } ], "default": null }
|
||||||
|
]
|
||||||
|
}
|
Loading…
Reference in New Issue