NIFI-6096: Fixed ValidateRecord handling of nested maps

This closes #3424

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Matthew Burgess 2019-04-09 13:49:12 -04:00 committed by Mike Thomsen
parent 1cadc72222
commit a5dff7b45c
5 changed files with 101 additions and 1 deletions

View File

@ -736,7 +736,7 @@ public class DataTypeUtils {
}
public static boolean isMapTypeCompatible(final Object value) {
return value != null && value instanceof Map;
return value != null && (value instanceof Map || value instanceof MapRecord);
}

View File

@ -552,6 +552,8 @@
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
<exclude>src/test/resources/TestValidateRecord/nested-map-input.json</exclude>
<exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processors.standard;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
@ -30,6 +31,7 @@ import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -360,4 +362,50 @@ public class TestValidateRecord {
invalidFlowFile.assertContentEquals(expectedInvalidContents);
}
@Test
public void testValidateNestedMap() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/nested-map-schema.avsc")), StandardCharsets.UTF_8);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", validWriter);
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(validWriter);
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
runner.addControllerService("invalid-writer", invalidWriter);
runner.enableControllerService(invalidWriter);
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
// Both records should be valid if strict type checking is off
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/nested-map-input.json"));
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
runner.assertTransferCount(ValidateRecord.REL_INVALID, 0);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
runner.clearTransferState();
// The second record should be invalid if strict type checking is on
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/nested-map-input.json"));
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
}
}

View File

@ -0,0 +1,18 @@
[{
"images" : [ {
"headers" : {
"Accept-Ranges" : "bytes",
"Server" : "Apache"
},
"id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
} ]
},
{
"images" : [ {
"headers" : {
"Accept-Ranges" : 4,
"Server" : 10
},
"id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
} ]
}]

View File

@ -0,0 +1,32 @@
{
"type": "record",
"name": "example",
"fields": [
{
"name": "images",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "images",
"fields": [
{
"name": "id",
"type": [
"string",
"null"
]
},
{
"name": "headers",
"type": {
"type": "map",
"values": "string"
}
}
]
}
}
}
]
}