From fe5ef39ff2f342bd59c843b411b1fe4b1cf597ba Mon Sep 17 00:00:00 2001 From: dan-s1 Date: Fri, 19 Apr 2024 16:42:10 +0000 Subject: [PATCH] NIFI-13069 Removed ConvertAvroToJSON Signed-off-by: Matt Burgess This closes #8675 --- .../processors/avro/ConvertAvroToJSON.java | 228 ----------- .../org.apache.nifi.processor.Processor | 5 +- .../avro/TestConvertAvroToJSON.java | 362 ------------------ 3 files changed, 2 insertions(+), 593 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java delete mode 100644 nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java deleted file mode 100644 index 2ddf66e4d1..0000000000 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.avro; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -@SideEffectFree -@SupportsBatching -@Tags({"avro", "convert", "json"}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " - + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " - + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " - + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects. If an incoming FlowFile does " - + "not contain any records, an empty JSON object is the output. Empty/Single Avro record FlowFile inputs are optionally wrapped in a container as dictated by 'Wrap Single Record'") -@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") -public class ConvertAvroToJSON extends AbstractProcessor { - protected static final String CONTAINER_ARRAY = "array"; - protected static final String CONTAINER_NONE = "none"; - - private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); - - static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() - .name("JSON container options") - .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE - + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").") - .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY) - .required(true) - .defaultValue(CONTAINER_ARRAY) - .build(); - static final PropertyDescriptor WRAP_SINGLE_RECORD = new PropertyDescriptor.Builder() - .name("Wrap Single Record") - .description("Determines if the resulting output for empty records or a single record should be wrapped in a container array as specified by '" + CONTAINER_OPTIONS.getName() + "'") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() - .name("Avro schema") - .description("If the Avro records do not contain the schema (datum only), it must be specified here.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after it has been converted to JSON") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") - .build(); - - private List properties; - private volatile Schema schema = null; - - @Override - protected void init(ProcessorInitializationContext context) { - super.init(context); - - final List properties = new ArrayList<>(); - properties.add(CONTAINER_OPTIONS); - properties.add(WRAP_SINGLE_RECORD); - properties.add(SCHEMA); - this.properties = Collections.unmodifiableList(properties); - } - - @Override - protected List getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public Set getRelationships() { - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - return rels; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); - final boolean useContainer = containerOption.equals(CONTAINER_ARRAY); - // Wrap a single record (inclusive of no records) only when a container is being used - final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; - - final String stringSchema = context.getProperty(SCHEMA).getValue(); - final boolean schemaLess = stringSchema != null; - - try { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { - final GenericData genericData = GenericData.get(); - - if (schemaLess) { - if (schema == null) { - schema = new Schema.Parser().parse(stringSchema); - } - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut)) { - final DatumReader reader = new GenericDatumReader(schema); - final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); - final GenericRecord record = reader.read(null, decoder); - - // Schemaless records are singletons, so both useContainer and wrapSingleRecord - // need to be true before we wrap it with an array - if (useContainer && wrapSingleRecord) { - out.write('['); - } - - final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : genericData.toString(record).getBytes(StandardCharsets.UTF_8); - out.write(outputBytes); - - if (useContainer && wrapSingleRecord) { - out.write(']'); - } - } - } else { - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { - - int recordCount = 0; - GenericRecord currRecord = null; - if (reader.hasNext()) { - currRecord = reader.next(); - recordCount++; - } - - // Open container if desired output is an array format and there are are multiple records or - // if configured to wrap single record - if (reader.hasNext() && useContainer || wrapSingleRecord) { - out.write('['); - } - - // Determine the initial output record, inclusive if we should have an empty set of Avro records - final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); - out.write(outputBytes); - - while (reader.hasNext()) { - if (useContainer) { - out.write(','); - } else { - out.write('\n'); - } - - currRecord = reader.next(currRecord); - out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); - recordCount++; - } - - // Close container if desired output is an array format and there are multiple records or if - // configured to wrap a single record - if (recordCount > 1 && useContainer || wrapSingleRecord) { - out.write(']'); - } - } - } - } - }); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.transfer(flowFile, REL_SUCCESS); - } -} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7ab13fa694..be454c22fc 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.avro.ConvertAvroToJSON -org.apache.nifi.processors.avro.ExtractAvroMetadata -org.apache.nifi.processors.avro.SplitAvro +org.apache.nifi.processors.avro.ExtractAvroMetadata +org.apache.nifi.processors.avro.SplitAvro diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java deleted file mode 100644 index 57ba39d197..0000000000 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.avro; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; - -public class TestConvertAvroToJSON { - - @Test - public void testSingleAvroMessage() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testSingleAvroMessage_noContainer() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testSingleAvroMessage_wrapSingleMessage() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); - runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]"); - } - - @Test - public void testSingleAvroMessage_wrapSingleMessage_noContainer() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - // Verify we do not wrap output for a single record if not configured to use a container - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testSingleSchemalessAvroMessage() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - String stringSchema = schema.toString(); - runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); - final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - datumWriter.write(user1, encoder); - - encoder.flush(); - out1.flush(); - byte[] test = out1.toByteArray(); - runner.enqueue(test); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testSingleSchemalessAvroMessage_noContainer() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - String stringSchema = schema.toString(); - runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); - final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - datumWriter.write(user1, encoder); - - encoder.flush(); - out1.flush(); - byte[] test = out1.toByteArray(); - runner.enqueue(test); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testSingleSchemalessAvroMessage_wrapSingleMessage() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); - runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); - Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - String stringSchema = schema.toString(); - runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); - final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - datumWriter.write(user1, encoder); - - encoder.flush(); - out1.flush(); - byte[] test = out1.toByteArray(); - runner.enqueue(test); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]"); - } - - @Test - public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); - Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - String stringSchema = schema.toString(); - runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); - final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - datumWriter.write(user1, encoder); - - encoder.flush(); - out1.flush(); - byte[] test = out1.toByteArray(); - runner.enqueue(test); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); - } - - @Test - public void testMultipleAvroMessages() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(schema); - user2.put("name", "George"); - user2.put("favorite_number", 1024); - user2.put("favorite_color", "red"); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1, user2); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null},{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}]"); - } - - @Test - public void testNonJsonHandledProperly() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.enqueue("hello".getBytes()); - runner.run(); - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); - } - - private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter datumWriter, final GenericRecord... users) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, out); - for (final GenericRecord user : users) { - dataFileWriter.append(user); - } - - dataFileWriter.close(); - return out; - } - - @Test - public void testMultipleAvroMessagesContainerNone() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); - - final GenericRecord user1 = new GenericData.Record(schema); - user1.put("name", "Alyssa"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(schema); - user2.put("name", "George"); - user2.put("favorite_number", 1024); - user2.put("favorite_color", "red"); - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); - } - - @Test - public void testEmptyFlowFile() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - - runner.enqueue(new byte[]{}); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); - } - - @Test - public void testZeroRecords() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("{}"); - - } - - @Test - public void testZeroRecords_wrapSingleRecord() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); - runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); - final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); - - - final DatumWriter datumWriter = new GenericDatumWriter<>(schema); - final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter); - runner.enqueue(out1.toByteArray()); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); - out.assertContentEquals("[{}]"); - - } -}