From b31c76bf30f847842e3ef1c6f928b2e93f517b34 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 5 Aug 2015 14:54:02 -0500 Subject: [PATCH] NIFI-821 Support Merging of Avro NIFI-821 Changing error logging to debug, changing mime-type, adding a try-close for Avro reader, changing new ArrayList to Collections.emptyList() --- .../nifi-standard-processors/pom.xml | 2 + .../processors/standard/MergeContent.java | 157 +++++++++++++++++- .../processors/standard/TestMergeContent.java | 139 ++++++++++++++++ .../resources/TestMergeContent/place.avsc | 7 + .../test/resources/TestMergeContent/user.avsc | 9 + 5 files changed, 313 insertions(+), 1 deletion(-) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 4d1e542d6e..994612255c 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -214,6 +214,8 @@ language governing permissions and limitations under the License. --> src/test/resources/TestMergeContent/demarcate src/test/resources/TestMergeContent/foot src/test/resources/TestMergeContent/head + src/test/resources/TestMergeContent/user.avsc + src/test/resources/TestMergeContent/place.avsc src/test/resources/TestModifyBytes/noFooter.txt src/test/resources/TestModifyBytes/noFooter_noHeader.txt src/test/resources/TestModifyBytes/noHeader.txt diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 65f4124d55..c8f9bbe07e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -32,10 +33,19 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -149,6 +159,7 @@ public class MergeContent extends BinFiles { public static final String MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = "FlowFile Stream, v2"; public static final String MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile Tar, v1"; public static final String MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation"; + public static final String MERGE_FORMAT_AVRO_VALUE = "Avro"; public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue( MERGE_FORMAT_TAR_VALUE, @@ -179,6 +190,10 @@ public class MergeContent extends BinFiles { MERGE_FORMAT_CONCAT_VALUE, MERGE_FORMAT_CONCAT_VALUE, "The contents of all FlowFiles will be concatenated together into a single FlowFile"); + public static final AllowableValue MERGE_FORMAT_AVRO = new AllowableValue( + MERGE_FORMAT_AVRO_VALUE, + MERGE_FORMAT_AVRO_VALUE, + "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile"); public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes"; public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes"; @@ -200,7 +215,7 @@ public class MergeContent extends BinFiles { .required(true) .name("Merge Format") .description("Determines the format that will be used to merge the content.") - .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT) + .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO) .defaultValue(MERGE_FORMAT_CONCAT.getValue()) .build(); public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder() @@ -400,6 +415,9 @@ public class MergeContent extends BinFiles { case MERGE_FORMAT_CONCAT_VALUE: merger = new BinaryConcatenationMerge(); break; + case MERGE_FORMAT_AVRO_VALUE: + merger = new AvroMerge(); + break; default: throw new AssertionError(); } @@ -451,6 +469,12 @@ public class MergeContent extends BinFiles { getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle}); session.transfer(bundle, REL_MERGED); + for (final FlowFileSessionWrapper unmerged : merger.getUnmergedFlowFiles()) { + final ProcessSession unmergedSession = unmerged.getSession(); + final FlowFile unmergedCopy = unmergedSession.clone(unmerged.getFlowFile()); + unmergedSession.transfer(unmergedCopy, REL_FAILURE); + } + // We haven't committed anything, parent will take care of it return false; } @@ -628,6 +652,11 @@ public class MergeContent extends BinFiles { public String getMergedContentType() { return mimeType; } + + @Override + public List getUnmergedFlowFiles() { + return Collections.emptyList(); + } } private List getFlowFiles(final List sessionWrappers) { @@ -714,6 +743,11 @@ public class MergeContent extends BinFiles { public String getMergedContentType() { return "application/tar"; } + + @Override + public List getUnmergedFlowFiles() { + return Collections.emptyList(); + } } private class FlowFileStreamMerger implements MergeBin { @@ -771,6 +805,11 @@ public class MergeContent extends BinFiles { public String getMergedContentType() { return mimeType; } + + @Override + public List getUnmergedFlowFiles() { + return Collections.emptyList(); + } } private class ZipMerge implements MergeBin { @@ -821,6 +860,120 @@ public class MergeContent extends BinFiles { public String getMergedContentType() { return "application/zip"; } + + @Override + public List getUnmergedFlowFiles() { + return Collections.emptyList(); + } + } + + private class AvroMerge implements MergeBin { + + private List unmerged = new ArrayList<>(); + + @Override + public FlowFile merge(ProcessContext context, final ProcessSession session, final List wrappers) { + + final Map metadata = new TreeMap<>(); + final ObjectHolder schema = new ObjectHolder<>(null); + final ObjectHolder inputCodec = new ObjectHolder<>(null); + final DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter()); + + // we don't pass the parents to the #create method because the parents belong to different sessions + FlowFile bundle = session.create(); + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + for (final FlowFileSessionWrapper wrapper : wrappers) { + final FlowFile flowFile = wrapper.getFlowFile(); + wrapper.getSession().read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + boolean canMerge = true; + try (DataFileStream reader = new DataFileStream<>(in, + new GenericDatumReader())) { + if (schema.get() == null) { + // this is the first file - set up the writer, and store the + // Schema & metadata we'll use. + schema.set(reader.getSchema()); + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + metadata.put(key, metadatum); + writer.setMeta(key, metadatum); + } + } + inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); + if (inputCodec.get() == null) { + inputCodec.set(DataFileConstants.NULL_CODEC); + } + writer.setCodec(CodecFactory.fromString(inputCodec.get())); + writer.create(schema.get(), out); + } else { + // check that we're appending to the same schema + if (!schema.get().equals(reader.getSchema())) { + getLogger().debug("Input file {} has different schema - {}, not merging", + new Object[]{flowFile.getId(), reader.getSchema().getName()}); + canMerge = false; + unmerged.add(wrapper); + } + + // check that we're appending to the same metadata + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + byte[] writersMetadatum = metadata.get(key); + if (!Arrays.equals(metadatum, writersMetadatum)) { + getLogger().debug("Input file {} has different non-reserved metadata, not merging", + new Object[]{flowFile.getId()}); + canMerge = false; + unmerged.add(wrapper); + } + } + } + + // check that we're appending to the same codec + String thisCodec = reader.getMetaString(DataFileConstants.CODEC); + if (thisCodec == null) { + thisCodec = DataFileConstants.NULL_CODEC; + } + if (!inputCodec.get().equals(thisCodec)) { + getLogger().debug("Input file {} has different codec, not merging", + new Object[]{flowFile.getId()}); + canMerge = false; + unmerged.add(wrapper); + } + } + + // write the Avro content from the current FlowFile to the merged OutputStream + if (canMerge) { + writer.appendAllFrom(reader, false); + } + } + } + }); + } + writer.flush(); + } finally { + writer.close(); + } + } + }); + + session.getProvenanceReporter().join(getFlowFiles(wrappers), bundle); + return bundle; + } + + @Override + public String getMergedContentType() { + return "application/avro-binary"; + } + + @Override + public List getUnmergedFlowFiles() { + return unmerged; + } } private static class KeepUniqueAttributeStrategy implements AttributeStrategy { @@ -911,6 +1064,8 @@ public class MergeContent extends BinFiles { FlowFile merge(ProcessContext context, ProcessSession session, List flowFiles); String getMergedContentType(); + + List getUnmergedFlowFiles(); } private interface AttributeStrategy { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index daad455cdc..c53c488bb9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -32,6 +32,16 @@ import java.util.Map; import java.util.Set; import java.util.zip.ZipInputStream; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.IOUtils; @@ -39,6 +49,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; @@ -55,6 +66,134 @@ public class TestMergeContent { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "DEBUG"); } + @Test + public void testSimpleAvroConcat() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + + final GenericRecord user3 = new GenericData.Record(schema); + user3.put("name", "John"); + user3.put("favorite_number", 5); + user3.put("favorite_color", "blue"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + // create a reader for the merged contet + byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema, "name"); + + Assert.assertEquals(3, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + Assert.assertTrue(users.containsKey("Ben")); + Assert.assertTrue(users.containsKey("John")); + } + + @Test + public void testAvroConcatWithDifferentSchemas() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + + final Schema schema1 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + final Schema schema2 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/place.avsc")); + + final GenericRecord record1 = new GenericData.Record(schema1); + record1.put("name", "Alyssa"); + record1.put("favorite_number", 256); + + final GenericRecord record2 = new GenericData.Record(schema2); + record2.put("name", "Some Place"); + + final GenericRecord record3 = new GenericData.Record(schema1); + record3.put("name", "John"); + record3.put("favorite_number", 5); + record3.put("favorite_color", "blue"); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema1); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema1, record1, datumWriter); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema2, record2, datumWriter); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema1, record3, datumWriter); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 1); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + final byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema1, "name"); + Assert.assertEquals(2, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + Assert.assertTrue(users.containsKey("John")); + + final MockFlowFile failure = runner.getFlowFilesForRelationship(MergeContent.REL_FAILURE).get(0); + final byte[] failureData = runner.getContentAsByteArray(failure); + final Map places = getGenericRecordMap(failureData, schema2, "name"); + Assert.assertEquals(1, places.size()); + Assert.assertTrue(places.containsKey("Some Place")); + } + + private Map getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException { + // create a reader for the merged contet + DatumReader datumReader = new GenericDatumReader<>(schema); + SeekableByteArrayInput input = new SeekableByteArrayInput(data); + DataFileReader dataFileReader = new DataFileReader<>(input, datumReader); + + // read all the records into a map to verify all the records are there + Map records = new HashMap<>(); + while (dataFileReader.hasNext()) { + GenericRecord user = dataFileReader.next(); + records.put(user.get(key).toString(), user); + } + return records; + } + + private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter datumWriter) throws IOException { + ByteArrayOutputStream out2 = new ByteArrayOutputStream(); + DataFileWriter dataFileWriter2 = new DataFileWriter(datumWriter); + dataFileWriter2.create(schema, out2); + dataFileWriter2.append(user2); + dataFileWriter2.close(); + return out2; + } + @Test public void testSimpleBinaryConcat() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc new file mode 100644 index 0000000000..a39daa9989 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/place.avsc @@ -0,0 +1,7 @@ +{"namespace": "example.avro", + "type": "record", + "name": "Place", + "fields": [ + {"name": "name", "type": "string"} + ] +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc new file mode 100644 index 0000000000..117ea70ee0 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestMergeContent/user.avsc @@ -0,0 +1,9 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +}