NIFI-4441: patch avro maps in union types. This closes #2207.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Patrice Freydiere 2017-10-11 22:17:15 +02:00 committed by Mark Payne
parent 39a484b631
commit 5f7bd81af9
5 changed files with 64 additions and 0 deletions

View File

@ -42,4 +42,18 @@
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/org/apache/nifi/avro/data.avro</exclude>
<exclude>src/test/resources/org/apache/nifi/avro/schema.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -626,6 +626,7 @@ public class AvroTypeUtil {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount()); final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) { for (final RecordField recordField : recordSchema.getFields()) {
Object value = avroRecord.get(recordField.getFieldName()); Object value = avroRecord.get(recordField.getFieldName());
if (value == null) { if (value == null) {
for (final String alias : recordField.getAliases()) { for (final String alias : recordField.getAliases()) {
@ -637,6 +638,7 @@ public class AvroTypeUtil {
} }
final String fieldName = recordField.getFieldName(); final String fieldName = recordField.getFieldName();
try {
final Field avroField = avroRecord.getSchema().getField(fieldName); final Field avroField = avroRecord.getSchema().getField(fieldName);
if (avroField == null) { if (avroField == null) {
values.put(fieldName, null); values.put(fieldName, null);
@ -650,6 +652,10 @@ public class AvroTypeUtil {
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName); final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
values.put(fieldName, coercedValue); values.put(fieldName, coercedValue);
} catch (Exception ex) {
logger.debug("fail to convert field " + fieldName, ex );
throw ex;
}
} }
return values; return values;
@ -716,6 +722,10 @@ public class AvroTypeUtil {
return true; return true;
} }
break; break;
case MAP:
if (value instanceof Map) {
return true;
}
} }
return DataTypeUtils.isCompatibleDataType(value, dataType); return DataTypeUtils.isCompatibleDataType(value, dataType);

View File

@ -20,6 +20,7 @@ package org.apache.nifi.avro;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -28,6 +29,9 @@ import java.util.Optional;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type; import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
@ -239,4 +243,20 @@ public class TestAvroTypeUtil {
Assert.assertEquals(recordASchema, ((RecordDataType)recordBParentField.get().getDataType()).getChildSchema()); Assert.assertEquals(recordASchema, ((RecordDataType)recordBParentField.get().getDataType()).getChildSchema());
} }
@Test
public void testMapWithNullSchema() throws IOException {
Schema recursiveSchema = new Schema.Parser().parse(getClass().getResourceAsStream("schema.json"));
// Make sure the following doesn't throw an exception
RecordSchema recordASchema = AvroTypeUtil.createSchema(recursiveSchema.getTypes().get(0));
// check the fix with the proper file
try(DataFileStream<GenericRecord> r = new DataFileStream<>(getClass().getResourceAsStream("data.avro"),
new GenericDatumReader<>())) {
GenericRecord n= r.next();
AvroTypeUtil.convertAvroRecordToMap(n, recordASchema);
}
}
} }

View File

@ -0,0 +1,20 @@
[ {
"namespace" : "net.a",
"type" : "record",
"name" : "O",
"fields" : [ {
"name" : "hash",
"type" : [ "null", {
"type" : "map",
"values" : "string"
} ]
} ]
}, {
"namespace" : "net.a",
"type" : "record",
"name" : "A",
"fields" : [ {
"name" : "o",
"type" : [ "null", "O" ]
}]
} ]