diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index d320e781ef..ec5c320a4a 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -1,169 +1,169 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.avro; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; - -@SideEffectFree -@SupportsBatching -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " - + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " - + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " - + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects") -@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") -public class ConvertAvroToJSON extends AbstractProcessor { - protected static final String CONTAINER_ARRAY = "array"; - protected static final String CONTAINER_NONE = "none"; - - private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); - - static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() - .name("JSON container options") - .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE - + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").") - .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY) - .required(true) - .defaultValue(CONTAINER_ARRAY) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after it has been converted to JSON") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") - .build(); - - - private List properties; - - @Override - protected void init(ProcessorInitializationContext context) { - super.init(context); - - final List properties = new ArrayList<>(); - properties.add(CONTAINER_OPTIONS); - this.properties = Collections.unmodifiableList(properties); - } - - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public Set getRelationships() { - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - return rels; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); - - try { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn); - - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { - - final GenericData genericData = GenericData.get(); - - if (reader.hasNext() == false ) { - out.write(EMPTY_JSON_OBJECT); - return; - } - int recordCount = 1; - GenericRecord reuse = reader.next(); - // Only open container if more than one record - if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){ - out.write('['); - } - out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); - - while (reader.hasNext()) { - if (containerOption.equals(CONTAINER_ARRAY)) { - out.write(','); - } else { - out.write('\n'); - } - - reuse = reader.next(reuse); - out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); - recordCount++; - } - - // Only close container if more than one record - if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { - out.write(']'); - } - } - } - }); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.transfer(flowFile, REL_SUCCESS); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; + +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " + + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " + + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects") +@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") +public class ConvertAvroToJSON extends AbstractProcessor { + protected static final String CONTAINER_ARRAY = "array"; + protected static final String CONTAINER_NONE = "none"; + + private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); + + static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() + .name("JSON container options") + .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").") + .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY) + .required(true) + .defaultValue(CONTAINER_ARRAY) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after it has been converted to JSON") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") + .build(); + + + private List properties; + + @Override + protected void init(ProcessorInitializationContext context) { + super.init(context); + + final List properties = new ArrayList<>(); + properties.add(CONTAINER_OPTIONS); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); + + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn); + + final OutputStream out = new BufferedOutputStream(rawOut); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + final GenericData genericData = GenericData.get(); + + if (reader.hasNext() == false ) { + out.write(EMPTY_JSON_OBJECT); + return; + } + int recordCount = 1; + GenericRecord reuse = reader.next(); + // Only open container if more than one record + if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){ + out.write('['); + } + out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); + + while (reader.hasNext()) { + if (containerOption.equals(CONTAINER_ARRAY)) { + out.write(','); + } else { + out.write('\n'); + } + + reuse = reader.next(reuse); + out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); + recordCount++; + } + + // Only close container if more than one record + if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { + out.write(']'); + } + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + session.transfer(flowFile, REL_SUCCESS); + } +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 94868262d1..3535156fc0 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -1,158 +1,158 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.avro; - -import java.io.File; -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; - -public class TestConvertAvroToJSON { - - @Test - public void testSingleAvroMessage() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testMultipleAvroMessages() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(schema); - user2.put("name", "George"); - user2.put("favorite_number", 1024); - user2.put("favorite_color", "red"); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null},{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}]"); - } - - @Test - public void testNonJsonHandledProperly() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.enqueue("hello".getBytes()); - runner.run(); - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); - } - - private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter datumWriter, final GenericRecord... users) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, out); - for (final GenericRecord user : users) { - dataFileWriter.append(user); - } - - dataFileWriter.close(); - return out; - } - - @Test - public void testMultipleAvroMessagesContainerNone() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(schema); - user2.put("name", "George"); - user2.put("favorite_number", 1024); - user2.put("favorite_color", "red"); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); - } - - @Test - public void testEmptyFlowFile() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - - runner.enqueue(new byte[]{}); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); - } - - @Test - public void testZeroRecords() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{}"); - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import java.io.File; +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestConvertAvroToJSON { + + @Test + public void testSingleAvroMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testMultipleAvroMessages() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "George"); + user2.put("favorite_number", 1024); + user2.put("favorite_color", "red"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null},{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}]"); + } + + @Test + public void testNonJsonHandledProperly() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.enqueue("hello".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); + } + + private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter datumWriter, final GenericRecord... users) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, out); + for (final GenericRecord user : users) { + dataFileWriter.append(user); + } + + dataFileWriter.close(); + return out; + } + + @Test + public void testMultipleAvroMessagesContainerNone() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "George"); + user2.put("favorite_number", 1024); + user2.put("favorite_color", "red"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); + } + + @Test + public void testEmptyFlowFile() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + + runner.enqueue(new byte[]{}); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); + } + + @Test + public void testZeroRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); + runner.enqueue(out1.toByteArray()); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{}"); + + } +}