From 274dc0902efaebf98aeb46a96bccba9c9218e25c Mon Sep 17 00:00:00 2001 From: Ryan Persaud Date: Fri, 20 May 2016 15:27:39 -0400 Subject: [PATCH] NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson. - Made suggested changes and removed unused imports found by checkstyle - This closes #459. --- .../processors/avro/ConvertAvroToJSON.java | 107 +++++++++++----- .../avro/TestConvertAvroToJSON.java | 118 ++++++++++++++++++ 2 files changed, 192 insertions(+), 33 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index d9fa4ffca2..2ddf66e4d1 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@ -28,10 +28,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -49,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; @SideEffectFree @SupportsBatching @@ -81,6 +86,12 @@ public class ConvertAvroToJSON extends AbstractProcessor { .defaultValue("false") .required(true) .build(); + static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("Avro schema") + .description("If the Avro records do not contain the schema (datum only), it must be specified here.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -92,6 +103,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { .build(); private List properties; + private volatile Schema schema = null; @Override protected void init(ProcessorInitializationContext context) { @@ -100,6 +112,7 @@ public class ConvertAvroToJSON extends AbstractProcessor { final List properties = new ArrayList<>(); properties.add(CONTAINER_OPTIONS); properties.add(WRAP_SINGLE_RECORD); + properties.add(SCHEMA); this.properties = Collections.unmodifiableList(properties); } @@ -128,49 +141,77 @@ public class ConvertAvroToJSON extends AbstractProcessor { // Wrap a single record (inclusive of no records) only when a container is being used final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; + final String stringSchema = context.getProperty(SCHEMA).getValue(); + final boolean schemaLess = stringSchema != null; + try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn); - final OutputStream out = new BufferedOutputStream(rawOut); - final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + final GenericData genericData = GenericData.get(); - final GenericData genericData = GenericData.get(); - - int recordCount = 0; - GenericRecord currRecord = null; - if (reader.hasNext()) { - currRecord = reader.next(); - recordCount++; + if (schemaLess) { + if (schema == null) { + schema = new Schema.Parser().parse(stringSchema); } + try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut)) { + final DatumReader reader = new GenericDatumReader(schema); + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); + final GenericRecord record = reader.read(null, decoder); - // Open container if desired output is an array format and there are are multiple records or - // if configured to wrap single record - if (reader.hasNext() && useContainer || wrapSingleRecord) { - out.write('['); - } - - // Determine the initial output record, inclusive if we should have an empty set of Avro records - final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); - out.write(outputBytes); - - while (reader.hasNext()) { - if (useContainer) { - out.write(','); - } else { - out.write('\n'); + // Schemaless records are singletons, so both useContainer and wrapSingleRecord + // need to be true before we wrap it with an array + if (useContainer && wrapSingleRecord) { + out.write('['); } - currRecord = reader.next(currRecord); - out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); - recordCount++; - } + final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : genericData.toString(record).getBytes(StandardCharsets.UTF_8); + out.write(outputBytes); - // Close container if desired output is an array format and there are multiple records or if - // configured to wrap a single record - if (recordCount > 1 && useContainer || wrapSingleRecord) { - out.write(']'); + if (useContainer && wrapSingleRecord) { + out.write(']'); + } + } + } else { + try (final InputStream in = new BufferedInputStream(rawIn); + final OutputStream out = new BufferedOutputStream(rawOut); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + int recordCount = 0; + GenericRecord currRecord = null; + if (reader.hasNext()) { + currRecord = reader.next(); + recordCount++; + } + + // Open container if desired output is an array format and there are are multiple records or + // if configured to wrap single record + if (reader.hasNext() && useContainer || wrapSingleRecord) { + out.write('['); + } + + // Determine the initial output record, inclusive if we should have an empty set of Avro records + final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8); + out.write(outputBytes); + + while (reader.hasNext()) { + if (useContainer) { + out.write(','); + } else { + out.write('\n'); + } + + currRecord = reader.next(currRecord); + out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8)); + recordCount++; + } + + // Close container if desired output is an array format and there are multiple records or if + // configured to wrap a single record + if (recordCount > 1 && useContainer || wrapSingleRecord) { + out.write(']'); + } } } } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java index 856677a759..0884eb311b 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java @@ -24,7 +24,9 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -119,6 +121,122 @@ public class TestConvertAvroToJSON { out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); } + @Test + public void testSingleSchemalessAvroMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testSingleSchemalessAvroMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } + + @Test + public void testSingleSchemalessAvroMessage_wrapSingleMessage() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]"); + } + + @Test + public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); + runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE); + runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true)); + Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + String stringSchema = schema.toString(); + runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final ByteArrayOutputStream out1 = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null); + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + datumWriter.write(user1, encoder); + + encoder.flush(); + out1.flush(); + byte[] test = out1.toByteArray(); + runner.enqueue(test); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); + out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); + } @Test public void testMultipleAvroMessages() throws IOException {