diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 38e87a8496..52d749f6a6 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -327,7 +327,13 @@ public class AvroTypeUtil { return map; } else if (rawValue instanceof Map) { - return rawValue; + final Map objectMap = (Map) rawValue; + final Map map = new HashMap<>(objectMap.size()); + for (final String s : objectMap.keySet()) { + final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName); + map.put(s, converted); + } + return map; } else { throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map"); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index 409ede2f12..dc5d943071 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericData.Array; import org.apache.avro.generic.GenericRecord; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -115,6 +116,7 @@ public abstract class TestWriteAvroResult { final List subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType())); final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields); + final DataType subRecordDataType = RecordFieldType.RECORD.getRecordDataType(subRecordSchema); final List fields = new ArrayList<>(); fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); @@ -126,11 +128,15 @@ public abstract class TestWriteAvroResult { fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))); fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType())); fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); - fields.add(new RecordField("record", RecordFieldType.RECORD.getRecordDataType(subRecordSchema))); + fields.add(new RecordField("record", subRecordDataType)); + fields.add(new RecordField("map", RecordFieldType.MAP.getMapDataType(subRecordDataType))); final RecordSchema recordSchema = new SimpleRecordSchema(fields); final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello")); + final Map innerMap = new HashMap<>(); + innerMap.put("key1", innerRecord); + final Map values = new HashMap<>(); values.put("string", "hello"); values.put("int", 8); @@ -142,6 +148,7 @@ public abstract class TestWriteAvroResult { values.put("nullOrLong", null); values.put("array", new Integer[] {1, 2, 3}); values.put("record", innerRecord); + values.put("map", innerMap); final Record record = new MapRecord(recordSchema, values); @@ -188,6 +195,14 @@ public abstract class TestWriteAvroResult { } else if (recordValue instanceof byte[]) { final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue); assertEquals(fieldName + " not equal", bb, avroValue); + } else if (recordValue instanceof Map) { + assertTrue(fieldName + " should have been instanceof Map", avroValue instanceof Map); + final Map avroMap = (Map) avroValue; + final Map recordMap = (Map) recordValue; + assertEquals(fieldName + " not equal", recordMap.size(), avroMap.size()); + for (Object s : avroMap.keySet()) { + assertMatch((Record) recordMap.get(s.toString()), (GenericRecord) avroMap.get(s)); + } } else if (recordValue instanceof Record) { assertMatch((Record) recordValue, (GenericRecord) avroValue); } else { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc index cc7f60e6f0..4d26b640ae 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc @@ -42,6 +42,12 @@ { "name": "field1", "type": "string" } ] } - } + }, { + "name": "map", + "type": { + "type": "map", + "values": "subRecord" + } + } ] -} \ No newline at end of file +}