NIFI-1234 Augmenting container handling functionality for single Avro records and adjusting formatting in ConvertAvroToJSON.

Reviewed by Tony Kurc (tkurc@apache.org). This closes #136.
This commit is contained in:
Aldrin Piri 2015-12-01 17:38:46 -05:00 committed by Tony Kurc
parent bde270a911
commit ecc240b918
2 changed files with 122 additions and 20 deletions

View File

@ -57,13 +57,14 @@ import org.apache.nifi.processor.io.StreamCallback;
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
+ "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects") + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects. If an incoming FlowFile does "
+ "not contain any records, an empty JSON object is the output. Empty/Single Avro record FlowFile inputs are optionally wrapped in a container as dictated by 'Wrap Single Record'")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class ConvertAvroToJSON extends AbstractProcessor { public class ConvertAvroToJSON extends AbstractProcessor {
protected static final String CONTAINER_ARRAY = "array"; protected static final String CONTAINER_ARRAY = "array";
protected static final String CONTAINER_NONE = "none"; protected static final String CONTAINER_NONE = "none";
private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8); private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
.name("JSON container options") .name("JSON container options")
@ -73,6 +74,13 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.required(true) .required(true)
.defaultValue(CONTAINER_ARRAY) .defaultValue(CONTAINER_ARRAY)
.build(); .build();
static final PropertyDescriptor WRAP_SINGLE_RECORD = new PropertyDescriptor.Builder()
.name("Wrap Single Record")
.description("Determines if the resulting output for empty records or a single record should be wrapped in a container array as specified by '" + CONTAINER_OPTIONS.getName() + "'")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -83,7 +91,6 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason") .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
.build(); .build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
@Override @Override
@ -92,6 +99,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONTAINER_OPTIONS); properties.add(CONTAINER_OPTIONS);
properties.add(WRAP_SINGLE_RECORD);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -116,51 +124,59 @@ public class ConvertAvroToJSON extends AbstractProcessor {
} }
final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue(); final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue();
final boolean useContainer = containerOption.equals(CONTAINER_ARRAY);
// Wrap a single record (inclusive of no records) only when a container is being used
final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
try { try {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@Override @Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn); try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut);
final OutputStream out = new BufferedOutputStream(rawOut); final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final GenericData genericData = GenericData.get(); final GenericData genericData = GenericData.get();
if (reader.hasNext() == false ) { int recordCount = 0;
out.write(EMPTY_JSON_OBJECT); GenericRecord currRecord = null;
return; if (reader.hasNext()) {
currRecord = reader.next();
recordCount++;
} }
int recordCount = 1;
GenericRecord reuse = reader.next(); // Open container if desired output is an array format and there are are multiple records or
// Only open container if more than one record // if configured to wrap single record
if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){ if (reader.hasNext() && useContainer || wrapSingleRecord) {
out.write('['); out.write('[');
} }
out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
// Determine the initial output record, inclusive if we should have an empty set of Avro records
final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
out.write(outputBytes);
while (reader.hasNext()) { while (reader.hasNext()) {
if (containerOption.equals(CONTAINER_ARRAY)) { if (useContainer) {
out.write(','); out.write(',');
} else { } else {
out.write('\n'); out.write('\n');
} }
reuse = reader.next(reuse); currRecord = reader.next(currRecord);
out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8)); out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
recordCount++; recordCount++;
} }
// Only close container if more than one record // Close container if desired output is an array format and there are multiple records or if
if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { // configured to wrap a single record
if (recordCount > 1 && useContainer || wrapSingleRecord) {
out.write(']'); out.write(']');
} }
} }
} }
}); });
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe}); getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }

View File

@ -53,6 +53,73 @@ public class TestConvertAvroToJSON {
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
} }
@Test
public void testSingleAvroMessage_noContainer() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
}
@Test
public void testSingleAvroMessage_wrapSingleMessage() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]");
}
@Test
public void testSingleAvroMessage_wrapSingleMessage_noContainer() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
// Verify we do not wrap output for a single record if not configured to use a container
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
}
@Test @Test
public void testMultipleAvroMessages() throws IOException { public void testMultipleAvroMessages() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON()); final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
@ -155,4 +222,23 @@ public class TestConvertAvroToJSON {
out.assertContentEquals("{}"); out.assertContentEquals("{}");
} }
@Test
public void testZeroRecords_wrapSingleRecord() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("[{}]");
}
} }