NIFI-821 Support Merging of Avro

NIFI-821 Changing error logging to debug, changing mime-type, adding a try-close for Avro reader, changing new ArrayList to Collections.emptyList()
This commit is contained in:
Bryan Bende 2015-08-05 14:54:02 -05:00
parent bbacc3d8ca
commit b31c76bf30
5 changed files with 313 additions and 1 deletions

View File

@ -214,6 +214,8 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestMergeContent/demarcate</exclude>
<exclude>src/test/resources/TestMergeContent/foot</exclude>
<exclude>src/test/resources/TestMergeContent/head</exclude>
<exclude>src/test/resources/TestMergeContent/user.avsc</exclude>
<exclude>src/test/resources/TestMergeContent/place.avsc</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

View File

@ -23,6 +23,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -32,10 +33,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -149,6 +159,7 @@ public class MergeContent extends BinFiles {
public static final String MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = "FlowFile Stream, v2";
public static final String MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile Tar, v1";
public static final String MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation";
public static final String MERGE_FORMAT_AVRO_VALUE = "Avro";
public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue(
MERGE_FORMAT_TAR_VALUE,
@ -179,6 +190,10 @@ public class MergeContent extends BinFiles {
MERGE_FORMAT_CONCAT_VALUE,
MERGE_FORMAT_CONCAT_VALUE,
"The contents of all FlowFiles will be concatenated together into a single FlowFile");
public static final AllowableValue MERGE_FORMAT_AVRO = new AllowableValue(
MERGE_FORMAT_AVRO_VALUE,
MERGE_FORMAT_AVRO_VALUE,
"The Avro contents of all FlowFiles will be concatenated together into a single FlowFile");
public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes";
public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes";
@ -200,7 +215,7 @@ public class MergeContent extends BinFiles {
.required(true)
.name("Merge Format")
.description("Determines the format that will be used to merge the content.")
.allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT)
.allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO)
.defaultValue(MERGE_FORMAT_CONCAT.getValue())
.build();
public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder()
@ -400,6 +415,9 @@ public class MergeContent extends BinFiles {
case MERGE_FORMAT_CONCAT_VALUE:
merger = new BinaryConcatenationMerge();
break;
case MERGE_FORMAT_AVRO_VALUE:
merger = new AvroMerge();
break;
default:
throw new AssertionError();
}
@ -451,6 +469,12 @@ public class MergeContent extends BinFiles {
getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle});
session.transfer(bundle, REL_MERGED);
for (final FlowFileSessionWrapper unmerged : merger.getUnmergedFlowFiles()) {
final ProcessSession unmergedSession = unmerged.getSession();
final FlowFile unmergedCopy = unmergedSession.clone(unmerged.getFlowFile());
unmergedSession.transfer(unmergedCopy, REL_FAILURE);
}
// We haven't committed anything, parent will take care of it
return false;
}
@ -628,6 +652,11 @@ public class MergeContent extends BinFiles {
public String getMergedContentType() {
return mimeType;
}
@Override
public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
return Collections.emptyList();
}
}
private List<FlowFile> getFlowFiles(final List<FlowFileSessionWrapper> sessionWrappers) {
@ -714,6 +743,11 @@ public class MergeContent extends BinFiles {
public String getMergedContentType() {
return "application/tar";
}
@Override
public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
return Collections.emptyList();
}
}
private class FlowFileStreamMerger implements MergeBin {
@ -771,6 +805,11 @@ public class MergeContent extends BinFiles {
public String getMergedContentType() {
return mimeType;
}
@Override
public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
return Collections.emptyList();
}
}
private class ZipMerge implements MergeBin {
@ -821,6 +860,120 @@ public class MergeContent extends BinFiles {
public String getMergedContentType() {
return "application/zip";
}
@Override
public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
return Collections.emptyList();
}
}
private class AvroMerge implements MergeBin {
private List<FlowFileSessionWrapper> unmerged = new ArrayList<>();
@Override
public FlowFile merge(ProcessContext context, final ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
final Map<String, byte[]> metadata = new TreeMap<>();
final ObjectHolder<Schema> schema = new ObjectHolder<>(null);
final ObjectHolder<String> inputCodec = new ObjectHolder<>(null);
final DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
// we don't pass the parents to the #create method because the parents belong to different sessions
FlowFile bundle = session.create();
bundle = session.write(bundle, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
for (final FlowFileSessionWrapper wrapper : wrappers) {
final FlowFile flowFile = wrapper.getFlowFile();
wrapper.getSession().read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
boolean canMerge = true;
try (DataFileStream<GenericRecord> reader = new DataFileStream<>(in,
new GenericDatumReader<GenericRecord>())) {
if (schema.get() == null) {
// this is the first file - set up the writer, and store the
// Schema & metadata we'll use.
schema.set(reader.getSchema());
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
metadata.put(key, metadatum);
writer.setMeta(key, metadatum);
}
}
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
if (inputCodec.get() == null) {
inputCodec.set(DataFileConstants.NULL_CODEC);
}
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
writer.create(schema.get(), out);
} else {
// check that we're appending to the same schema
if (!schema.get().equals(reader.getSchema())) {
getLogger().debug("Input file {} has different schema - {}, not merging",
new Object[]{flowFile.getId(), reader.getSchema().getName()});
canMerge = false;
unmerged.add(wrapper);
}
// check that we're appending to the same metadata
for (String key : reader.getMetaKeys()) {
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key);
byte[] writersMetadatum = metadata.get(key);
if (!Arrays.equals(metadatum, writersMetadatum)) {
getLogger().debug("Input file {} has different non-reserved metadata, not merging",
new Object[]{flowFile.getId()});
canMerge = false;
unmerged.add(wrapper);
}
}
}
// check that we're appending to the same codec
String thisCodec = reader.getMetaString(DataFileConstants.CODEC);
if (thisCodec == null) {
thisCodec = DataFileConstants.NULL_CODEC;
}
if (!inputCodec.get().equals(thisCodec)) {
getLogger().debug("Input file {} has different codec, not merging",
new Object[]{flowFile.getId()});
canMerge = false;
unmerged.add(wrapper);
}
}
// write the Avro content from the current FlowFile to the merged OutputStream
if (canMerge) {
writer.appendAllFrom(reader, false);
}
}
}
});
}
writer.flush();
} finally {
writer.close();
}
}
});
session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle);
return bundle;
}
@Override
public String getMergedContentType() {
return "application/avro-binary";
}
@Override
public List<FlowFileSessionWrapper> getUnmergedFlowFiles() {
return unmerged;
}
}
private static class KeepUniqueAttributeStrategy implements AttributeStrategy {
@ -911,6 +1064,8 @@ public class MergeContent extends BinFiles {
FlowFile merge(ProcessContext context, ProcessSession session, List<FlowFileSessionWrapper> flowFiles);
String getMergedContentType();
List<FlowFileSessionWrapper> getUnmergedFlowFiles();
}
private interface AttributeStrategy {

View File

@ -32,6 +32,16 @@ import java.util.Map;
import java.util.Set;
import java.util.zip.ZipInputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
@ -39,6 +49,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
@ -55,6 +66,134 @@ public class TestMergeContent {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG");
}
@Test
public void testSimpleAvroConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
final GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
final GenericRecord user3 = new GenericData.Record(schema);
user3.put("name", "John");
user3.put("favorite_number", 5);
user3.put("favorite_color", "blue");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
// create a reader for the merged contet
byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema, "name");
Assert.assertEquals(3, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("Ben"));
Assert.assertTrue(users.containsKey("John"));
}
@Test
public void testAvroConcatWithDifferentSchemas() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_ENTRIES, "3");
runner.setProperty(MergeContent.MIN_ENTRIES, "3");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
final Schema schema1 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
final Schema schema2 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/place.avsc"));
final GenericRecord record1 = new GenericData.Record(schema1);
record1.put("name", "Alyssa");
record1.put("favorite_number", 256);
final GenericRecord record2 = new GenericData.Record(schema2);
record2.put("name", "Some Place");
final GenericRecord record3 = new GenericData.Record(schema1);
record3.put("name", "John");
record3.put("favorite_number", 5);
record3.put("favorite_color", "blue");
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema1);
final ByteArrayOutputStream out1 = serializeAvroRecord(schema1, record1, datumWriter);
final ByteArrayOutputStream out2 = serializeAvroRecord(schema2, record2, datumWriter);
final ByteArrayOutputStream out3 = serializeAvroRecord(schema1, record3, datumWriter);
runner.enqueue(out1.toByteArray());
runner.enqueue(out2.toByteArray());
runner.enqueue(out3.toByteArray());
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
final byte[] data = runner.getContentAsByteArray(bundle);
final Map<String, GenericRecord> users = getGenericRecordMap(data, schema1, "name");
Assert.assertEquals(2, users.size());
Assert.assertTrue(users.containsKey("Alyssa"));
Assert.assertTrue(users.containsKey("John"));
final MockFlowFile failure = runner.getFlowFilesForRelationship(MergeContent.REL_FAILURE).get(0);
final byte[] failureData = runner.getContentAsByteArray(failure);
final Map<String, GenericRecord> places = getGenericRecordMap(failureData, schema2, "name");
Assert.assertEquals(1, places.size());
Assert.assertTrue(places.containsKey("Some Place"));
}
private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException {
// create a reader for the merged contet
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
SeekableByteArrayInput input = new SeekableByteArrayInput(data);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(input, datumReader);
// read all the records into a map to verify all the records are there
Map<String,GenericRecord> records = new HashMap<>();
while (dataFileReader.hasNext()) {
GenericRecord user = dataFileReader.next();
records.put(user.get(key).toString(), user);
}
return records;
}
private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException {
ByteArrayOutputStream out2 = new ByteArrayOutputStream();
DataFileWriter<GenericRecord> dataFileWriter2 = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter2.create(schema, out2);
dataFileWriter2.append(user2);
dataFileWriter2.close();
return out2;
}
@Test
public void testSimpleBinaryConcat() throws IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());

View File

@ -0,0 +1,7 @@
{"namespace": "example.avro",
"type": "record",
"name": "Place",
"fields": [
{"name": "name", "type": "string"}
]
}

View File

@ -0,0 +1,9 @@
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}