This commit is contained in:
Mark Payne 2015-10-23 09:52:33 -04:00
commit 5d90c9be07
2 changed files with 89 additions and 7 deletions

View File

@ -22,7 +22,10 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileStream;
@ -34,11 +37,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.io.StreamCallback;
@ -49,9 +54,20 @@ import org.apache.nifi.processor.io.StreamCallback;
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
+ "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records.") + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class ConvertAvroToJSON extends AbstractProcessor { public class ConvertAvroToJSON extends AbstractProcessor {
protected static final String CONTAINER_ARRAY = "array";
protected static final String CONTAINER_NONE = "none";
static final PropertyDescriptor CONTAINER_OPTIONS
= new PropertyDescriptor.Builder()
.name("JSON container options")
.description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
.allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
.required(true)
.defaultValue(CONTAINER_ARRAY)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -62,6 +78,23 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
.build(); .build();
private List<PropertyDescriptor> properties;
@Override
protected void init(ProcessorInitializationContext context) {
super.init(context);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONTAINER_OPTIONS);
this.properties = Collections.unmodifiableList(properties);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>(); final Set<Relationship> rels = new HashSet<>();
@ -77,11 +110,14 @@ public class ConvertAvroToJSON extends AbstractProcessor {
return; return;
} }
final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue();
try { try {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@Override @Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn); try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut); final OutputStream out = new BufferedOutputStream(rawOut);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
@ -90,7 +126,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
final String json = genericData.toString(record); final String json = genericData.toString(record);
int recordCount = 0; int recordCount = 0;
if (reader.hasNext()) { if (reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)) {
out.write('['); out.write('[');
} }
@ -98,13 +134,18 @@ public class ConvertAvroToJSON extends AbstractProcessor {
recordCount++; recordCount++;
while (reader.hasNext()) { while (reader.hasNext()) {
out.write(','); if (containerOption.equals(CONTAINER_ARRAY)) {
out.write(',');
} else {
out.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
}
final GenericRecord nextRecord = reader.next(record); final GenericRecord nextRecord = reader.next(record);
out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8)); out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8));
recordCount++; recordCount++;
} }
if (recordCount > 1) { if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) {
out.write(']'); out.write(']');
} }
} }

View File

@ -16,7 +16,11 @@
*/ */
package org.apache.nifi.processors.avro; package org.apache.nifi.processors.avro;
import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@ -27,9 +31,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class TestConvertAvroToJSON { public class TestConvertAvroToJSON {
@Test @Test
@ -57,6 +58,8 @@ public class TestConvertAvroToJSON {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
final GenericRecord user1 = new GenericData.Record(schema); final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa"); user1.put("name", "Alyssa");
user1.put("favorite_number", 256); user1.put("favorite_number", 256);
@ -85,4 +88,42 @@ public class TestConvertAvroToJSON {
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
} }
private ByteArrayOutputStream serializeAvroRecord(final Schema schema, final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, out);
for (final GenericRecord user : users) {
dataFileWriter.append(user);
}
dataFileWriter.close();
return out;
}
@Test
public void testMultipleAvroMessagesContainerNone() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "George");
user2.put("favorite_number", 1024);
user2.put("favorite_color", "red");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter, user1, user2);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}");
}
} }