This commit is contained in:
Mark Payne 2015-11-11 08:16:38 -05:00
commit 6c510fae80
3 changed files with 330 additions and 296 deletions

View File

@ -61,6 +61,8 @@ public class ConvertAvroToJSON extends AbstractProcessor {
protected static final String CONTAINER_ARRAY = "array"; protected static final String CONTAINER_ARRAY = "array";
protected static final String CONTAINER_NONE = "none"; protected static final String CONTAINER_NONE = "none";
private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder() static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
.name("JSON container options") .name("JSON container options")
.description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE .description("Determines how stream of records is exposed: either as a sequence of single Objects (" + CONTAINER_NONE
@ -123,16 +125,18 @@ public class ConvertAvroToJSON extends AbstractProcessor {
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final GenericData genericData = GenericData.get(); final GenericData genericData = GenericData.get();
GenericRecord record = reader.next();
final String json = genericData.toString(record);
int recordCount = 0; if (reader.hasNext() == false ) {
if (reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)) { out.write(EMPTY_JSON_OBJECT);
return;
}
int recordCount = 1;
GenericRecord reuse = reader.next();
// Only open container if more than one record
if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){
out.write('['); out.write('[');
} }
out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
out.write(json.getBytes(StandardCharsets.UTF_8));
recordCount++;
while (reader.hasNext()) { while (reader.hasNext()) {
if (containerOption.equals(CONTAINER_ARRAY)) { if (containerOption.equals(CONTAINER_ARRAY)) {
@ -141,11 +145,12 @@ public class ConvertAvroToJSON extends AbstractProcessor {
out.write('\n'); out.write('\n');
} }
final GenericRecord nextRecord = reader.next(record); reuse = reader.next(reuse);
out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8)); out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
recordCount++; recordCount++;
} }
// Only close container if more than one record
if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) { if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) {
out.write(']'); out.write(']');
} }

View File

@ -218,7 +218,7 @@ public class SplitAvro extends AbstractProcessor {
/** /**
* Splits the incoming Avro datafile into batches of records by reading and de-serializing each record. * Splits the incoming Avro datafile into batches of records by reading and de-serializing each record.
*/ */
private class RecordSplitter implements Splitter { static private class RecordSplitter implements Splitter {
private final int splitSize; private final int splitSize;
private final boolean transferMetadata; private final boolean transferMetadata;
@ -300,7 +300,7 @@ public class SplitAvro extends AbstractProcessor {
/** /**
* Writes a binary Avro Datafile to the OutputStream. * Writes a binary Avro Datafile to the OutputStream.
*/ */
private class DatafileSplitWriter implements SplitWriter { static private class DatafileSplitWriter implements SplitWriter {
private final boolean transferMetadata; private final boolean transferMetadata;
private DataFileWriter<GenericRecord> writer; private DataFileWriter<GenericRecord> writer;
@ -344,7 +344,7 @@ public class SplitAvro extends AbstractProcessor {
/** /**
* Writes bare Avro records to the OutputStream. * Writes bare Avro records to the OutputStream.
*/ */
private class BareRecordSplitWriter implements SplitWriter { static private class BareRecordSplitWriter implements SplitWriter {
private Encoder encoder; private Encoder encoder;
private DatumWriter<GenericRecord> writer; private DatumWriter<GenericRecord> writer;

View File

@ -126,4 +126,33 @@ public class TestConvertAvroToJSON {
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0); final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}"); out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 1024, \"favorite_color\": \"red\"}");
} }
@Test
public void testEmptyFlowFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
runner.enqueue(new byte[]{});
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
}
@Test
public void testZeroRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter);
runner.enqueue(out1.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
out.assertContentEquals("{}");
}
} }