NIFI-1909 Adding ability to process schemaless Avro records to ConvertAvrotoJson.

- Made suggested changes and removed unused imports found by checkstyle
 - This closes #459.
This commit is contained in:
Ryan Persaud 2016-05-20 15:27:39 -04:00 committed by Mark Payne
parent 8593bd771f
commit 274dc0902e
2 changed files with 192 additions and 33 deletions

View File

@ -28,10 +28,14 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -49,6 +53,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@SideEffectFree @SideEffectFree
@SupportsBatching @SupportsBatching
@ -81,6 +86,12 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.defaultValue("false") .defaultValue("false")
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.name("Avro schema")
.description("If the Avro records do not contain the schema (datum only), it must be specified here.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -92,6 +103,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.build(); .build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private volatile Schema schema = null;
@Override @Override
protected void init(ProcessorInitializationContext context) { protected void init(ProcessorInitializationContext context) {
@ -100,6 +112,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONTAINER_OPTIONS); properties.add(CONTAINER_OPTIONS);
properties.add(WRAP_SINGLE_RECORD); properties.add(WRAP_SINGLE_RECORD);
properties.add(SCHEMA);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -128,16 +141,43 @@ public class ConvertAvroToJSON extends AbstractProcessor {
// Wrap a single record (inclusive of no records) only when a container is being used // Wrap a single record (inclusive of no records) only when a container is being used
final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer; final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
final String stringSchema = context.getProperty(SCHEMA).getValue();
final boolean schemaLess = stringSchema != null;
try { try {
flowFile = session.write(flowFile, new StreamCallback() { flowFile = session.write(flowFile, new StreamCallback() {
@Override @Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
final GenericData genericData = GenericData.get();
if (schemaLess) {
if (schema == null) {
schema = new Schema.Parser().parse(stringSchema);
}
try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut)) {
final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
final GenericRecord record = reader.read(null, decoder);
// Schemaless records are singletons, so both useContainer and wrapSingleRecord
// need to be true before we wrap it with an array
if (useContainer && wrapSingleRecord) {
out.write('[');
}
final byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT : genericData.toString(record).getBytes(StandardCharsets.UTF_8);
out.write(outputBytes);
if (useContainer && wrapSingleRecord) {
out.write(']');
}
}
} else {
try (final InputStream in = new BufferedInputStream(rawIn); try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut); final OutputStream out = new BufferedOutputStream(rawOut);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final GenericData genericData = GenericData.get();
int recordCount = 0; int recordCount = 0;
GenericRecord currRecord = null; GenericRecord currRecord = null;
if (reader.hasNext()) { if (reader.hasNext()) {
@ -174,6 +214,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
} }
} }
} }
}
}); });
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe}); getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe});

View File

@ -24,7 +24,9 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -119,6 +121,122 @@ public class TestConvertAvroToJSON {
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}"); out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
} }
@Test
public void testSingleSchemalessAvroMessage() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
String stringSchema = schema.toString();
runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
datumWriter.write(user1, encoder);
encoder.flush();
out1.flush();
byte[] test = out1.toByteArray();
runner.enqueue(test);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
}
@Test
public void testSingleSchemalessAvroMessage_noContainer() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
String stringSchema = schema.toString();
runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
datumWriter.write(user1, encoder);
encoder.flush();
out1.flush();
byte[] test = out1.toByteArray();
runner.enqueue(test);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
}
@Test
public void testSingleSchemalessAvroMessage_wrapSingleMessage() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
String stringSchema = schema.toString();
runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
datumWriter.write(user1, encoder);
encoder.flush();
out1.flush();
byte[] test = out1.toByteArray();
runner.enqueue(test);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]");
}
@Test
public void testSingleSchemalessAvroMessage_wrapSingleMessage_noContainer() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
String stringSchema = schema.toString();
runner.setProperty(ConvertAvroToJSON.SCHEMA, stringSchema);
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final ByteArrayOutputStream out1 = new ByteArrayOutputStream();
final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out1, null);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
datumWriter.write(user1, encoder);
encoder.flush();
out1.flush();
byte[] test = out1.toByteArray();
runner.enqueue(test);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
}
@Test @Test
public void testMultipleAvroMessages() throws IOException { public void testMultipleAvroMessages() throws IOException {