mirror of https://github.com/apache/nifi.git
unit test reproducing the issue
This commit is contained in:
parent
3de3ad4029
commit
cd83e70b91
|
@ -585,6 +585,8 @@
|
||||||
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
|
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
|
||||||
<exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude>
|
<exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude>
|
||||||
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
|
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
|
||||||
|
<exclude>src/test/resources/TestValidateRecord/int-maps-schema.avsc</exclude>
|
||||||
|
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import org.apache.nifi.avro.AvroReader;
|
import org.apache.nifi.avro.AvroReader;
|
||||||
|
import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
|
||||||
|
import org.apache.nifi.avro.AvroRecordReader;
|
||||||
import org.apache.nifi.avro.AvroRecordSetWriter;
|
import org.apache.nifi.avro.AvroRecordSetWriter;
|
||||||
import org.apache.nifi.csv.CSVReader;
|
import org.apache.nifi.csv.CSVReader;
|
||||||
import org.apache.nifi.csv.CSVRecordSetWriter;
|
import org.apache.nifi.csv.CSVRecordSetWriter;
|
||||||
|
@ -31,6 +33,7 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil;
|
||||||
import org.apache.nifi.serialization.DateTimeUtils;
|
import org.apache.nifi.serialization.DateTimeUtils;
|
||||||
import org.apache.nifi.serialization.MalformedRecordException;
|
import org.apache.nifi.serialization.MalformedRecordException;
|
||||||
import org.apache.nifi.serialization.RecordReader;
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.record.MapRecord;
|
||||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordField;
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
|
@ -45,10 +48,12 @@ import org.junit.Test;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -547,4 +552,45 @@ public class TestValidateRecord {
|
||||||
final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||||
validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
|
validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateMaps() throws IOException, InitializationException, MalformedRecordException {
|
||||||
|
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc")), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||||
|
runner.addControllerService("reader", jsonReader);
|
||||||
|
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
|
||||||
|
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
|
||||||
|
runner.enableControllerService(jsonReader);
|
||||||
|
|
||||||
|
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
|
||||||
|
runner.addControllerService("writer", avroWriter);
|
||||||
|
runner.enableControllerService(avroWriter);
|
||||||
|
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||||
|
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||||
|
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/int-maps-data.json"));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||||
|
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||||
|
|
||||||
|
byte[] source = validFlowFile.toByteArray();
|
||||||
|
|
||||||
|
try (final InputStream in = new ByteArrayInputStream(source); final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in)) {
|
||||||
|
final Object[] values = reader.nextRecord().getValues();
|
||||||
|
assertEquals("uuid", values[0]);
|
||||||
|
assertEquals(2, ((HashMap<?,?>) values[1]).size());
|
||||||
|
final Object[] data = (Object[]) values[2];
|
||||||
|
assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[0]).getValue("points")).size());
|
||||||
|
assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[1]).getValue("points")).size());
|
||||||
|
assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[2]).getValue("points")).size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
{
|
||||||
|
"id": "uuid",
|
||||||
|
"points": {
|
||||||
|
"1": 0,
|
||||||
|
"2": 0
|
||||||
|
},
|
||||||
|
"data": [
|
||||||
|
{
|
||||||
|
"points": {
|
||||||
|
"3": 0,
|
||||||
|
"4": 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"points": {
|
||||||
|
"5": 0,
|
||||||
|
"6": 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"points": {
|
||||||
|
"7": 0,
|
||||||
|
"8": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
{
|
||||||
|
"name": "statistic",
|
||||||
|
"type": "record",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "points",
|
||||||
|
"type": {
|
||||||
|
"type": "map",
|
||||||
|
"values": "int"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "data",
|
||||||
|
"type": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"name": "data",
|
||||||
|
"type": "record",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "version",
|
||||||
|
"type": [
|
||||||
|
"null",
|
||||||
|
"string"
|
||||||
|
],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "points",
|
||||||
|
"type": {
|
||||||
|
"type": "map",
|
||||||
|
"values": "int"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -160,10 +160,10 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
return textValue;
|
return textValue;
|
||||||
}
|
}
|
||||||
}
|
default:
|
||||||
|
|
||||||
return textValue;
|
return textValue;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (fieldNode.isArray()) {
|
if (fieldNode.isArray()) {
|
||||||
final ArrayNode arrayNode = (ArrayNode) fieldNode;
|
final ArrayNode arrayNode = (ArrayNode) fieldNode;
|
||||||
|
|
Loading…
Reference in New Issue