mirror of https://github.com/apache/nifi.git
NIFI-12318: Fixed byte array generation in GenerateRecord
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #8109.
This commit is contained in:
parent
8dc09c8911
commit
eeb2b1a644
|
@ -306,7 +306,7 @@ public class GenerateRecord extends AbstractProcessor {
|
|||
case BOOLEAN:
|
||||
return FakerUtils.getFakeData("Bool.bool", faker);
|
||||
case BYTE:
|
||||
return faker.number().numberBetween(Byte.MIN_VALUE, Byte.MAX_VALUE);
|
||||
return (byte) faker.number().numberBetween(Byte.MIN_VALUE, Byte.MAX_VALUE);
|
||||
case CHAR:
|
||||
return (char) faker.number().numberBetween(Character.MIN_VALUE, Character.MAX_VALUE);
|
||||
case DATE:
|
||||
|
|
|
@ -16,25 +16,26 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.avro.AvroReader;
|
||||
import org.apache.nifi.avro.AvroRecordSetWriter;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||
import org.apache.nifi.processors.standard.faker.FakerMethodHolder;
|
||||
import org.apache.nifi.processors.standard.faker.FakerUtils;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
@ -43,8 +44,10 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
@ -181,7 +184,7 @@ public class TestGenerateRecord {
|
|||
testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
|
||||
testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); // Should be ignored
|
||||
testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
|
||||
testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
|
||||
testRunner.setProperty(GenerateRecord.NUM_RECORDS, "30");
|
||||
|
||||
testRunner.run();
|
||||
testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
|
||||
|
@ -198,7 +201,7 @@ public class TestGenerateRecord {
|
|||
@Test
|
||||
public void testGenerateNullableFieldsZeroNullPercentageSchemaText() throws Exception {
|
||||
String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
|
||||
final JsonRecordSetWriter recordWriter = new JsonRecordSetWriter();
|
||||
final AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
|
||||
testRunner.addControllerService("record-writer", recordWriter);
|
||||
testRunner.enableControllerService(recordWriter);
|
||||
testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
|
||||
|
@ -209,43 +212,55 @@ public class TestGenerateRecord {
|
|||
|
||||
testRunner.run();
|
||||
testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
|
||||
final String output = flowFile.getContent();
|
||||
final JsonFactory jsonFactory = new JsonFactory();
|
||||
try (JsonParser jsonParser = jsonFactory.createParser(output)) {
|
||||
jsonParser.setCodec(new ObjectMapper());
|
||||
JsonNode recordArray = jsonParser.readValueAsTree();
|
||||
assertTrue(recordArray instanceof ArrayNode);
|
||||
JsonNode recordNode = recordArray.get(0);
|
||||
JsonNode systemNode = recordNode.get("System");
|
||||
assertNotNull(systemNode);
|
||||
JsonNode providerNode = systemNode.get("Provider");
|
||||
assertNotNull(providerNode);
|
||||
JsonNode guidNode = providerNode.get("Guid");
|
||||
assertNotNull(guidNode);
|
||||
assertNotNull(guidNode.asText());
|
||||
JsonNode nameNode = providerNode.get("Name");
|
||||
assertNotNull(nameNode);
|
||||
assertNotNull(nameNode.asText());
|
||||
JsonNode eventIdNode = systemNode.get("EventID");
|
||||
assertNotNull(eventIdNode);
|
||||
eventIdNode.asInt(); // This would throw a NullPointerException if the value was null
|
||||
JsonNode eventDataNode = recordNode.get("EventData");
|
||||
assertNotNull(eventDataNode);
|
||||
JsonNode dataNode = eventDataNode.get("Data");
|
||||
assertNotNull(dataNode);
|
||||
assertTrue(dataNode instanceof ArrayNode);
|
||||
assertTrue(dataNode.size() <= 10 && dataNode.size() >= 0);
|
||||
for (int i = 0; i < dataNode.size(); i++) {
|
||||
JsonNode dataElementNode = dataNode.get(i);
|
||||
assertNotNull(dataElementNode);
|
||||
JsonNode dataElementNameNode = dataElementNode.get("Name");
|
||||
assertNotNull(dataElementNameNode);
|
||||
assertNotNull(dataElementNameNode.asText());
|
||||
JsonNode dataElementDataNode = dataElementNode.get("DataElement");
|
||||
assertNotNull(dataElementDataNode);
|
||||
assertNotNull(dataElementDataNode.asText());
|
||||
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
|
||||
|
||||
final AvroReader avroReader = new AvroReader();
|
||||
testRunner.addControllerService("avroReader", avroReader);
|
||||
testRunner.enableControllerService(avroReader);
|
||||
final byte[] validFlowFileBytes = flowFile.toByteArray();
|
||||
try (
|
||||
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFileBytes);
|
||||
final RecordReader recordReader = avroReader.createRecordReader(flowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, testRunner.getLogger());
|
||||
) {
|
||||
// Check correct schema
|
||||
final RecordSchema resultSchema = recordReader.getSchema();
|
||||
|
||||
final Optional<RecordField> systemField = resultSchema.getField("System");
|
||||
assertTrue(systemField.isPresent());
|
||||
assertEquals(RecordFieldType.RECORD, systemField.get().getDataType().getFieldType());
|
||||
RecordDataType systemRecordType = (RecordDataType) systemField.get().getDataType();
|
||||
RecordSchema systemSchema = systemRecordType.getChildSchema();
|
||||
|
||||
final Optional<RecordField> providerField = systemSchema.getField("Provider");
|
||||
assertTrue(providerField.isPresent());
|
||||
assertEquals(RecordFieldType.RECORD, providerField.get().getDataType().getFieldType());
|
||||
RecordDataType providerRecordType = (RecordDataType) providerField.get().getDataType();
|
||||
RecordSchema providerSchema = providerRecordType.getChildSchema();
|
||||
|
||||
final Optional<RecordField> guidField = providerSchema.getField("Guid");
|
||||
assertTrue(guidField.isPresent());
|
||||
assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()), guidField.get().getDataType());
|
||||
|
||||
// Check object type, class etc.
|
||||
final org.apache.nifi.serialization.record.Record record = recordReader.nextRecord();
|
||||
assertNotNull(record);
|
||||
final Object systemObject = record.getValue("System");
|
||||
assertNotNull(systemObject);
|
||||
assertTrue(systemObject instanceof org.apache.nifi.serialization.record.Record);
|
||||
final org.apache.nifi.serialization.record.Record systemRecord = (org.apache.nifi.serialization.record.Record) systemObject;
|
||||
final Object providerObject = systemRecord.getValue("Provider");
|
||||
assertNotNull(providerObject);
|
||||
assertTrue(providerObject instanceof org.apache.nifi.serialization.record.Record);
|
||||
final org.apache.nifi.serialization.record.Record providerRecord = (org.apache.nifi.serialization.record.Record) providerObject;
|
||||
final Object guidObject = providerRecord.getValue("Guid");
|
||||
assertNotNull(guidObject);
|
||||
assertTrue(guidObject instanceof Object[]);
|
||||
// Check for array of Byte objects if not empty
|
||||
Object[] guidArray = (Object[]) guidObject;
|
||||
if (guidArray.length > 0) {
|
||||
assertTrue(guidArray[0] instanceof Byte);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
"name": "ProviderType",
|
||||
"fields": [{
|
||||
"name": "Guid",
|
||||
"type": "string"
|
||||
"type": "bytes"
|
||||
}, {
|
||||
"name": "Name",
|
||||
"type": "string"
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
"name": "ProviderType",
|
||||
"fields": [{
|
||||
"name": "Guid",
|
||||
"type": ["null", "string"]
|
||||
"type": ["null", "bytes"]
|
||||
}, {
|
||||
"name": "Name",
|
||||
"type": ["null", "string"]
|
||||
|
|
Loading…
Reference in New Issue