diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java index 897effa236..311c209509 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -35,6 +35,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -46,6 +47,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -57,19 +59,36 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledOnOs(value = OS.WINDOWS, disabledReason = "Pretty-printing is not portable across operating systems") public class TestConvertRecord { + private static final String PERSON_SCHEMA; + private static final String READER_ID = "reader"; + private static final String WRITER_ID = "writer"; + private TestRunner runner; + + static { + try { + PERSON_SCHEMA = Files.readString(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @BeforeEach + void setUp() { + runner = TestRunners.newTestRunner(ConvertRecord.class); + } + @Test public void testSuccessfulConversion() throws InitializationException { final MockRecordParser readerService = new MockRecordParser(); final MockRecordWriter writerService = new MockRecordWriter("header", false); - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - runner.addControllerService("reader", readerService); + runner.addControllerService(READER_ID, readerService); runner.enableControllerService(readerService); - runner.addControllerService("writer", writerService); + runner.addControllerService(WRITER_ID, writerService); runner.enableControllerService(writerService); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); @@ -82,7 +101,7 @@ public class TestConvertRecord { runner.run(); runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); out.assertAttributeEquals("record.count", "3"); out.assertAttributeEquals("mime.type", "text/plain"); @@ -94,15 +113,14 @@ public class TestConvertRecord { final MockRecordParser readerService = new MockRecordParser(); final MockRecordWriter writerService = new MockRecordWriter("header", false); - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - runner.addControllerService("reader", readerService); + runner.addControllerService(READER_ID, readerService); runner.enableControllerService(readerService); - runner.addControllerService("writer", writerService); + runner.addControllerService(WRITER_ID, writerService); runner.enableControllerService(writerService); runner.setProperty(ConvertRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false"); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); @@ -121,7 +139,7 @@ public class TestConvertRecord { runner.run(); runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); out.assertAttributeEquals("record.count", "3"); out.assertAttributeEquals("mime.type", "text/plain"); @@ -133,14 +151,13 @@ public class TestConvertRecord { final MockRecordParser readerService = new MockRecordParser(2); final MockRecordWriter writerService = new MockRecordWriter("header", false); - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - runner.addControllerService("reader", readerService); + runner.addControllerService(READER_ID, readerService); runner.enableControllerService(readerService); - runner.addControllerService("writer", writerService); + runner.addControllerService(WRITER_ID, writerService); runner.enableControllerService(writerService); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); @@ -154,7 +171,7 @@ public class TestConvertRecord { // Original FlowFile should be routed to 'failure' relationship without modification runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).getFirst(); out.assertContentEquals(original.toByteArray()); out.assertAttributeEquals("record.error.message", "Intentional Unit Test Exception because 2 records have been read"); } @@ -165,14 +182,13 @@ public class TestConvertRecord { final MockRecordParser readerService = new MockRecordParser(); final MockRecordWriter writerService = new MockRecordWriter("header", false, 2); - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - runner.addControllerService("reader", readerService); + runner.addControllerService(READER_ID, readerService); runner.enableControllerService(readerService); - runner.addControllerService("writer", writerService); + runner.addControllerService(WRITER_ID, writerService); runner.enableControllerService(writerService); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); @@ -186,42 +202,39 @@ public class TestConvertRecord { // Original FlowFile should be routed to 'failure' relationship without modification runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).getFirst(); out.assertContentEquals(original.toByteArray()); out.assertAttributeEquals("record.error.message", "Unit Test intentionally throwing IOException after 2 records were written"); } @Test public void testJSONCompression() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); final JsonTreeReader jsonReader = new JsonTreeReader(); - runner.addControllerService("reader", jsonReader); - - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + runner.addControllerService(READER_ID, jsonReader); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.enableControllerService(jsonReader); final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); - runner.addControllerService("writer", jsonWriter); + runner.addControllerService(WRITER_ID, jsonWriter); runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); - runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); + runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); - runner.setProperty(jsonWriter, "compression-format", "snappy"); + runner.setProperty(jsonWriter, JsonRecordSetWriter.COMPRESSION_FORMAT, "snappy"); runner.enableControllerService(jsonWriter); - runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person.json")); + final Path person = Paths.get("src/test/resources/TestConvertRecord/input/person.json"); + runner.enqueue(person); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); runner.run(); runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) { @@ -232,13 +245,11 @@ public class TestConvertRecord { out.flush(); } - assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name())); + assertEquals(Files.readString(person), baos.toString(StandardCharsets.UTF_8)); } @Test public void testCSVFormattingWithEL() throws InitializationException { - TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - CSVReader csvReader = new CSVReader(); runner.addControllerService("csv-reader", csvReader); runner.setProperty(csvReader, CSVUtils.VALUE_SEPARATOR, "${csv.in.delimiter}"); @@ -257,9 +268,11 @@ public class TestConvertRecord { runner.setProperty(ConvertRecord.RECORD_READER, "csv-reader"); runner.setProperty(ConvertRecord.RECORD_WRITER, "csv-writer"); - String ffContent = "~ comment\n" + - "id|username|password\n" + - "123|'John'|^|^'^^\n"; + String ffContent = """ + ~ comment + id|username|password + 123|'John'|^|^'^^ + """; Map ffAttributes = new HashMap<>(); ffAttributes.put("csv.in.delimiter", "|"); @@ -274,38 +287,36 @@ public class TestConvertRecord { runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); - String expected = "`id`\t`username`\t`password`\n" + - "`123`\t`John`\t`|'^`\n"; + String expected = """ + `id`\t`username`\t`password` + `123`\t`John`\t`|'^` + """; assertEquals(expected, new String(flowFile.toByteArray())); } @Test public void testJSONLongToInt() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); final JsonTreeReader jsonReader = new JsonTreeReader(); - runner.addControllerService("reader", jsonReader); - - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + runner.addControllerService(READER_ID, jsonReader); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.enableControllerService(jsonReader); final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); - runner.addControllerService("writer", jsonWriter); + runner.addControllerService(WRITER_ID, jsonWriter); runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); - runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); + runner.setProperty(jsonWriter, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); runner.enableControllerService(jsonWriter); runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_long_id.json")); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); runner.run(); runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1); @@ -313,27 +324,23 @@ public class TestConvertRecord { @Test public void testEnumBadValue() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); final JsonTreeReader jsonReader = new JsonTreeReader(); - runner.addControllerService("reader", jsonReader); - - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + runner.addControllerService(READER_ID, jsonReader); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.enableControllerService(jsonReader); final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter(); - runner.addControllerService("writer", avroWriter); + runner.addControllerService(WRITER_ID, avroWriter); runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.enableControllerService(avroWriter); runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json")); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); runner.run(); @@ -342,27 +349,25 @@ public class TestConvertRecord { @Test public void testEnumUnionString() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); final JsonTreeReader jsonReader = new JsonTreeReader(); - runner.addControllerService("reader", jsonReader); + runner.addControllerService(READER_ID, jsonReader); - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc"))); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc"))); + final String personWithUnionEnumStringSchema = Files.readString(Paths.get("src/test/resources/TestConvertRecord/schema/person_with_union_enum_string.avsc")); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, personWithUnionEnumStringSchema); runner.enableControllerService(jsonReader); final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter(); - runner.addControllerService("writer", avroWriter); + runner.addControllerService(WRITER_ID, avroWriter); runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(avroWriter, SchemaAccessUtils.SCHEMA_TEXT, personWithUnionEnumStringSchema); runner.enableControllerService(avroWriter); runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_bad_enum.json")); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); runner.run(); @@ -374,8 +379,6 @@ public class TestConvertRecord { final String timezone = System.getProperty("user.timezone"); System.setProperty("user.timezone", "EST"); try { - TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); - JsonTreeReader jsonTreeReader = new JsonTreeReader(); runner.addControllerService("json-reader", jsonTreeReader); runner.setProperty(jsonTreeReader, DateTimeUtils.DATE_FORMAT, "yyyy-MM-dd"); @@ -394,7 +397,7 @@ public class TestConvertRecord { runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); DataFileStream avroStream = new DataFileStream<>(flowFile.getContentStream(), new NonCachingDatumReader<>()); assertTrue(avroStream.hasNext()); @@ -407,34 +410,30 @@ public class TestConvertRecord { } @Test - public void testJSONDroppingUnkownFields() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + public void testJSONDroppingUnknownFields() throws InitializationException, IOException { final JsonTreeReader jsonReader = new JsonTreeReader(); - runner.addControllerService("reader", jsonReader); - - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + runner.addControllerService(READER_ID, jsonReader); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.enableControllerService(jsonReader); final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); - runner.addControllerService("writer", jsonWriter); + runner.addControllerService(WRITER_ID, jsonWriter); runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); - runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, PERSON_SCHEMA); runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); runner.enableControllerService(jsonWriter); runner.enqueue(Paths.get("src/test/resources/TestConvertRecord/input/person_dropfield.json")); - runner.setProperty(ConvertRecord.RECORD_READER, "reader"); - runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + runner.setProperty(ConvertRecord.RECORD_READER, READER_ID); + runner.setProperty(ConvertRecord.RECORD_WRITER, WRITER_ID); runner.run(); runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0); - assertFalse(new String(flowFile.toByteArray()).contains("fieldThatShouldBeRemoved")); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).getFirst(); + assertFalse(flowFile.getContent().contains("fieldThatShouldBeRemoved")); } }