diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index 8faaa4fd66..d9fa4ffca2 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -57,13 +57,14 @@ import org.apache.nifi.processor.io.StreamCallback; @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " - + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects") + + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects. If an incoming FlowFile does " + + "not contain any records, an empty JSON object is the output. Empty/Single Avro record FlowFile inputs are optionally wrapped in a container as dictated by 'Wrap Single Record'") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") public class ConvertAvroToJSON extends AbstractProcessor { protected static final String CONTAINER_ARRAY = "array"; protected static final String CONTAINER_NONE = "none"; - private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); + private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() .name("JSON container options") @@ -73,6 +74,13 @@ public class ConvertAvroToJSON extends AbstractProcessor { .required(true) .defaultValue(CONTAINER_ARRAY) .build(); + static final PropertyDescriptor WRAP_SINGLE_RECORD = new PropertyDescriptor.Builder() + .name("Wrap Single Record") + .description("Determines if the resulting output for empty records or a single record should be wrapped in a container array as specified by '" + CONTAINER_OPTIONS.getName() + "'") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -83,7 +91,6 @@ public class ConvertAvroToJSON extends AbstractProcessor { .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") .build(); - private List properties; @Override @@ -92,6 +99,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { final List properties = new ArrayList<>(); properties.add(CONTAINER_OPTIONS); + properties.add(WRAP_SINGLE_RECORD); this.properties = Collections.unmodifiableList(properties); } @@ -116,51 +124,59 @@ public class ConvertAvroToJSON extends AbstractProcessor { } final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); + final boolean useContainer = containerOption.equals(CONTAINER_ARRAY); + // Wrap a single record (inclusive of no records) only when a container is being used + final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn); - - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + final OutputStream out = new BufferedOutputStream(rawOut); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { final GenericData genericData = GenericData.get(); - if (reader.hasNext() == false ) { - out.write(EMPTY_JSON_OBJECT); - return; + int recordCount = 0; + GenericRecord currRecord = null; + if (reader.hasNext()) { + currRecord = reader.next(); + recordCount++; } - int recordCount = 1; - GenericRecord reuse = reader.next(); - // Only open container if more than one record - if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){ + + // Open container if desired output is an array format and there are are multiple records or + // if configured to wrap single record + if (reader.hasNext() && useContainer || wrapSingleRecord) { out.write('['); } - out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); + + // Determine the initial output record, inclusive if we should have an empty set of Avro records + final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); + out.write(outputBytes); while (reader.hasNext()) { - if (containerOption.equals(CONTAINER_ARRAY)) { + if (useContainer) { out.write(','); } else { out.write('\n'); } - reuse = reader.next(reuse); - out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); + currRecord = reader.next(currRecord); + out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); recordCount++; } - // Only close container if more than one record - if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { + // Close container if desired output is an array format and there are multiple records or if + // configured to wrap a single record + if (recordCount > 1 && useContainer || wrapSingleRecord) { out.write(']'); } } } }); } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe}); + getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe}); session.transfer(flowFile, REL_FAILURE); return; } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 3535156fc0..856677a759 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -53,6 +53,73 @@ public class TestConvertAvroToJSON { out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); } + @Test + public void testSingleAvroMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testSingleAvroMessage_wrapSingleMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]"); + } + + @Test + public void testSingleAvroMessage_wrapSingleMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + // Verify we do not wrap output for a single record if not configured to use a container + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test public void testMultipleAvroMessages() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); @@ -155,4 +222,23 @@ public class TestConvertAvroToJSON { out.assertContentEquals("{}"); } + + @Test + public void testZeroRecords_wrapSingleRecord() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{}]"); + + } }