NIFI-3871: Convert Avro map values

This closes #1787.

- When converting from a raw value to an Avro object, convert the values
  of any Avro map types so that they can be complex types like other
  records.
This commit is contained in:
Steve Champagne 2017-05-11 16:39:38 -05:00 committed by Mark Payne
parent 0f693e9448
commit 6e4db6b11a
3 changed files with 31 additions and 4 deletions

View File

@ -327,7 +327,13 @@ public class AvroTypeUtil {
return map; return map;
} else if (rawValue instanceof Map) { } else if (rawValue instanceof Map) {
return rawValue; final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
final Map<String, Object> map = new HashMap<>(objectMap.size());
for (final String s : objectMap.keySet()) {
final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName);
map.put(s, converted);
}
return map;
} else { } else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map"); throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
} }

View File

@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
@ -115,6 +116,7 @@ public abstract class TestWriteAvroResult {
final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType())); final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType()));
final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields); final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields);
final DataType subRecordDataType = RecordFieldType.RECORD.getRecordDataType(subRecordSchema);
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
@ -126,11 +128,15 @@ public abstract class TestWriteAvroResult {
fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType())); fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType()));
fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
fields.add(new RecordField("record", RecordFieldType.RECORD.getRecordDataType(subRecordSchema))); fields.add(new RecordField("record", subRecordDataType));
fields.add(new RecordField("map", RecordFieldType.MAP.getMapDataType(subRecordDataType)));
final RecordSchema recordSchema = new SimpleRecordSchema(fields); final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello")); final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello"));
final Map<String, Object> innerMap = new HashMap<>();
innerMap.put("key1", innerRecord);
final Map<String, Object> values = new HashMap<>(); final Map<String, Object> values = new HashMap<>();
values.put("string", "hello"); values.put("string", "hello");
values.put("int", 8); values.put("int", 8);
@ -142,6 +148,7 @@ public abstract class TestWriteAvroResult {
values.put("nullOrLong", null); values.put("nullOrLong", null);
values.put("array", new Integer[] {1, 2, 3}); values.put("array", new Integer[] {1, 2, 3});
values.put("record", innerRecord); values.put("record", innerRecord);
values.put("map", innerMap);
final Record record = new MapRecord(recordSchema, values); final Record record = new MapRecord(recordSchema, values);
@ -188,6 +195,14 @@ public abstract class TestWriteAvroResult {
} else if (recordValue instanceof byte[]) { } else if (recordValue instanceof byte[]) {
final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue); final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue);
assertEquals(fieldName + " not equal", bb, avroValue); assertEquals(fieldName + " not equal", bb, avroValue);
} else if (recordValue instanceof Map) {
assertTrue(fieldName + " should have been instanceof Map", avroValue instanceof Map);
final Map<?, ?> avroMap = (Map<?, ?>) avroValue;
final Map<?, ?> recordMap = (Map<?, ?>) recordValue;
assertEquals(fieldName + " not equal", recordMap.size(), avroMap.size());
for (Object s : avroMap.keySet()) {
assertMatch((Record) recordMap.get(s.toString()), (GenericRecord) avroMap.get(s));
}
} else if (recordValue instanceof Record) { } else if (recordValue instanceof Record) {
assertMatch((Record) recordValue, (GenericRecord) avroValue); assertMatch((Record) recordValue, (GenericRecord) avroValue);
} else { } else {

View File

@ -42,6 +42,12 @@
{ "name": "field1", "type": "string" } { "name": "field1", "type": "string" }
] ]
} }
} }, {
"name": "map",
"type": {
"type": "map",
"values": "subRecord"
}
}
] ]
} }