diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 1e5c3386eb..3901ac5892 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -585,6 +585,8 @@ src/test/resources/TestValidateRecord/timestamp.json src/test/resources/TestLookupRecord/lookup-array-input.json src/test/resources/TestLookupRecord/lookup-array-output.json + src/test/resources/TestValidateRecord/int-maps-schema.avsc + src/test/resources/TestValidateRecord/int-maps-data.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index f53dd5cd76..1b4b7e4056 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.avro.AvroReader; +import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema; +import org.apache.nifi.avro.AvroRecordReader; import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.csv.CSVReader; import org.apache.nifi.csv.CSVRecordSetWriter; @@ -31,6 +33,7 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.DateTimeUtils; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -45,10 +48,12 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; import java.util.Optional; import static org.junit.Assert.assertEquals; @@ -547,4 +552,45 @@ public class TestValidateRecord { final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json")); } + + @Test + public void testValidateMaps() throws IOException, InitializationException, MalformedRecordException { + final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc")), StandardCharsets.UTF_8); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property"); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema); + runner.enableControllerService(jsonReader); + + final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter(); + runner.addControllerService("writer", avroWriter); + runner.enableControllerService(avroWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + + runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/int-maps-data.json")); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_VALID, 1); + final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); + + byte[] source = validFlowFile.toByteArray(); + + try (final InputStream in = new ByteArrayInputStream(source); final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in)) { + final Object[] values = reader.nextRecord().getValues(); + assertEquals("uuid", values[0]); + assertEquals(2, ((HashMap) values[1]).size()); + final Object[] data = (Object[]) values[2]; + assertEquals(2, ( (HashMap) ((MapRecord) data[0]).getValue("points")).size()); + assertEquals(2, ( (HashMap) ((MapRecord) data[1]).getValue("points")).size()); + assertEquals(2, ( (HashMap) ((MapRecord) data[2]).getValue("points")).size()); + } + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-data.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-data.json new file mode 100644 index 0000000000..00f9d13b37 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-data.json @@ -0,0 +1,27 @@ +{ + "id": "uuid", + "points": { + "1": 0, + "2": 0 + }, + "data": [ + { + "points": { + "3": 0, + "4": 0 + } + }, + { + "points": { + "5": 0, + "6": 0 + } + }, + { + "points": { + "7": 0, + "8": 0 + } + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-schema.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-schema.avsc new file mode 100644 index 0000000000..9aa302ad2a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/int-maps-schema.avsc @@ -0,0 +1,44 @@ +{ + "name": "statistic", + "type": "record", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "points", + "type": { + "type": "map", + "values": "int" + } + }, + { + "name": "data", + "type": { + "type": "array", + "items": { + "name": "data", + "type": "record", + "fields": [ + { + "name": "version", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "points", + "type": { + "type": "map", + "values": "int" + } + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index 69b7fab33f..f163707af4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -160,9 +160,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { } catch (final Exception e) { return textValue; } + default: + return textValue; } - - return textValue; } if (fieldNode.isArray()) {