NIFI-13965 Reduced duplicate code and optimized TestConvertRecord (#9488)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-11-04 22:39:57 -05:00 committed by GitHub
parent 10eda1ac69
commit b8c4cb081a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 92 additions and 93 deletions

View File

@ -35,6 +35,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@ -46,6 +47,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@ -57,19 +59,36 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not portable across operating systems")
public class TestConvertRecord {
private static final String PERSON_SCHEMA;
private static final String READER_ID = "reader";
private static final String WRITER_ID = "writer";
private TestRunner runner;
static {
try {
PERSON_SCHEMA = Files.readString(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@BeforeEach
void setUp() {
runner = TestRunners.newTestRunner(ConvertRecord.class);
}
@Test
public void testSuccessfulConversion() throws InitializationException {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
runner.addControllerService("reader", readerService);
runner.addControllerService(READER_ID, readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.addControllerService(WRITER_ID, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
@ -82,7 +101,7 @@ public class TestConvertRecord {
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
@ -94,15 +113,14 @@ public class TestConvertRecord {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
runner.addControllerService("reader", readerService);
runner.addControllerService(READER_ID, readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.addControllerService(WRITER_ID, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConvertRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false");
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
@ -121,7 +139,7 @@ public class TestConvertRecord {
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
@ -133,14 +151,13 @@ public class TestConvertRecord {
final MockRecordParser readerService = new MockRecordParser(2);
final MockRecordWriter writerService = new MockRecordWriter("header", false);
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
runner.addControllerService("reader", readerService);
runner.addControllerService(READER_ID, readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.addControllerService(WRITER_ID, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
@ -154,7 +171,7 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).getFirst();
out.assertContentEquals(original.toByteArray());
out.assertAttributeEquals("record.error.message", "Intentional Unit Test Exception because 2 records have been read");
}
@ -165,14 +182,13 @@ public class TestConvertRecord {
final MockRecordParser readerService = new MockRecordParser();
final MockRecordWriter writerService = new MockRecordWriter("header", false, 2);
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
runner.addControllerService("reader", readerService);
runner.addControllerService(READER_ID, readerService);
runner.enableControllerService(readerService);
runner.addControllerService("writer", writerService);
runner.addControllerService(WRITER_ID, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
@ -186,42 +202,39 @@ public class TestConvertRecord {
// Original FlowFile should be routed to 'failure' relationship without modification
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).getFirst();
out.assertContentEquals(original.toByteArray());
out.assertAttributeEquals("record.error.message", "Unit Test intentionally throwing IOException after 2 records were written");
}
@Test
public void testJSONCompression() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.addControllerService(READER_ID, jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.addControllerService(WRITER_ID, jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.setProperty(jsonWriter, "compression-format", "snappy");
runner.setProperty(jsonWriter, JsonRecordSetWriter.COMPRESSION_FORMAT, "snappy");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person.json"));
final Path person = Paths.get("src/test/resources/TestConvertRecord/input/person.json");
runner.enqueue(person);
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) {
@ -232,13 +245,11 @@ public class TestConvertRecord {
out.flush();
}
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name()));
assertEquals(Files.readString(person), baos.toString(StandardCharsets.UTF_8));
}
@Test
public void testCSVFormattingWithEL() throws InitializationException {
TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
CSVReader csvReader = new CSVReader();
runner.addControllerService("csv-reader", csvReader);
runner.setProperty(csvReader, CSVUtils.VALUE_SEPARATOR, "${csv.in.delimiter}");
@ -257,9 +268,11 @@ public class TestConvertRecord {
runner.setProperty(ConvertRecord.RECORD_READER, "csv-reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "csv-writer");
String ffContent = "~ comment\n" +
"id|username|password\n" +
"123|'John'|^|^'^^\n";
String ffContent = """
~ comment
id|username|password
123|'John'|^|^'^^
""";
Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("csv.in.delimiter", "|");
@ -274,38 +287,36 @@ public class TestConvertRecord {
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
String expected = "`id`\t`username`\t`password`\n" +
"`123`\t`John`\t`|'^`\n";
String expected = """
`id`\t`username`\t`password`
`123`\t`John`\t`|'^`
""";
assertEquals(expected, new String(flowFile.toByteArray()));
}
@Test
public void testJSONLongToInt() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.addControllerService(READER_ID, jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.addControllerService(WRITER_ID, jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_long_id.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
@ -313,27 +324,23 @@ public class TestConvertRecord {
@Test
public void testEnumBadValue() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.addControllerService(READER_ID, jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.enableControllerService(jsonReader);
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", avroWriter);
runner.addControllerService(WRITER_ID, avroWriter);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.enableControllerService(avroWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
runner.run();
@ -342,27 +349,25 @@ public class TestConvertRecord {
@Test
public void testEnumUnionString() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.addControllerService(READER_ID, jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc")));
final String personWithUnionEnumStringSchema = Files.readString(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc"));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, personWithUnionEnumStringSchema);
runner.enableControllerService(jsonReader);
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
runner.addControllerService("writer", avroWriter);
runner.addControllerService(WRITER_ID, avroWriter);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, personWithUnionEnumStringSchema);
runner.enableControllerService(avroWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
runner.run();
@ -374,8 +379,6 @@ public class TestConvertRecord {
final String timezone = System.getProperty("user.timezone");
System.setProperty("user.timezone", "EST");
try {
TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
JsonTreeReader jsonTreeReader = new JsonTreeReader();
runner.addControllerService("json-reader", jsonTreeReader);
runner.setProperty(jsonTreeReader, DateTimeUtils.DATE_FORMAT, "yyyy-MM-dd");
@ -394,7 +397,7 @@ public class TestConvertRecord {
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
DataFileStream<GenericRecord> avroStream = new DataFileStream<>(flowFile.getContentStream(), new NonCachingDatumReader<>());
assertTrue(avroStream.hasNext());
@ -407,34 +410,30 @@ public class TestConvertRecord {
}
@Test
public void testJSONDroppingUnkownFields() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
public void testJSONDroppingUnknownFields() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.addControllerService(READER_ID, jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.addControllerService(WRITER_ID, jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA);
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.setProperty(ConvertRecord.RECORD_READER, READER_ID);
runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
assertFalse(new String(flowFile.toByteArray()).contains("fieldThatShouldBeRemoved"));
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst();
assertFalse(flowFile.getContent().contains("fieldThatShouldBeRemoved"));
}
}