From 9756e9f1269efc05d1f51ab420e0468d5e2ee769 Mon Sep 17 00:00:00 2001 From: Tony Kurc Date: Sat, 7 Nov 2015 15:35:03 -0500 Subject: [PATCH] NIFI-1117 Added unit tests and default behavior for avro files with no records. Also fixed a findbugs warning in SplitAvro (made inner classes static) --- .../processors/avro/ConvertAvroToJSON.java | 23 +++++++++------ .../nifi/processors/avro/SplitAvro.java | 6 ++-- .../avro/TestConvertAvroToJSON.java | 29 +++++++++++++++++++ 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index 33951d7c8a..d320e781ef 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -61,6 +61,8 @@ public class ConvertAvroToJSON extends AbstractProcessor { protected static final String CONTAINER_ARRAY = "array"; protected static final String CONTAINER_NONE = "none"; + private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); + static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() .name("JSON container options") .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE @@ -123,16 +125,18 @@ public class ConvertAvroToJSON extends AbstractProcessor { final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { final GenericData genericData = GenericData.get(); - GenericRecord record = reader.next(); - final String json = genericData.toString(record); - int recordCount = 0; - if (reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)) { + if (reader.hasNext() == false ) { + out.write(EMPTY_JSON_OBJECT); + return; + } + int recordCount = 1; + GenericRecord reuse = reader.next(); + // Only open container if more than one record + if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){ out.write('['); } - - out.write(json.getBytes(StandardCharsets.UTF_8)); - recordCount++; + out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); while (reader.hasNext()) { if (containerOption.equals(CONTAINER_ARRAY)) { @@ -141,11 +145,12 @@ public class ConvertAvroToJSON extends AbstractProcessor { out.write('\n'); } - final GenericRecord nextRecord = reader.next(record); - out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8)); + reuse = reader.next(reuse); + out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); recordCount++; } + // Only close container if more than one record if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { out.write(']'); } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java index dbf577836f..38e3a0dfc0 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -218,7 +218,7 @@ public class SplitAvro extends AbstractProcessor { /** * Splits the incoming Avro datafile into batches of records by reading and de-serializing each record. */ - private class RecordSplitter implements Splitter { + static private class RecordSplitter implements Splitter { private final int splitSize; private final boolean transferMetadata; @@ -300,7 +300,7 @@ public class SplitAvro extends AbstractProcessor { /** * Writes a binary Avro Datafile to the OutputStream. */ - private class DatafileSplitWriter implements SplitWriter { + static private class DatafileSplitWriter implements SplitWriter { private final boolean transferMetadata; private DataFileWriter writer; @@ -344,7 +344,7 @@ public class SplitAvro extends AbstractProcessor { /** * Writes bare Avro records to the OutputStream. */ - private class BareRecordSplitWriter implements SplitWriter { + static private class BareRecordSplitWriter implements SplitWriter { private Encoder encoder; private DatumWriter writer; diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 302528e7df..94868262d1 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -126,4 +126,33 @@ public class TestConvertAvroToJSON { final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); } + + @Test + public void testEmptyFlowFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + + runner.enqueue(new byte[]{}); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); + } + + @Test + public void testZeroRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{}"); + + } }