diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index edbc03323a..5ebe524862 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -142,6 +142,33 @@ public class MergeContent extends BinFiles { public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); + + public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata", + "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."); + + public static final AllowableValue METADATA_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Metadata", "Keep Only Common Metadata", + "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values match those of the first FlowFile, any additional metadata " + + "will be dropped but the FlowFile will be merged. Any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); + + public static final AllowableValue METADATA_STRATEGY_IGNORE = new AllowableValue("Ignore Metadata", "Ignore Metadata", + "Ignores (does not transfer, compare, etc.) any metadata from a FlowFile whose content supports embedded metadata."); + + public static final AllowableValue METADATA_STRATEGY_DO_NOT_MERGE = new AllowableValue("Do Not Merge Uncommon Metadata", "Do Not Merge Uncommon Metadata", + "For any input format that supports metadata (Avro, e.g.), any FlowFile whose metadata values do not match those of the first FlowFile in the bin will not be merged."); + + public static final PropertyDescriptor METADATA_STRATEGY = new PropertyDescriptor.Builder() + .required(true) + .name("mergecontent-metadata-strategy") + .displayName("Metadata Strategy") + .description("For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. " + + "If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, " + + "only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to " + + "the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile " + + "will not be merged.") + .allowableValues(METADATA_STRATEGY_USE_FIRST, METADATA_STRATEGY_ALL_COMMON, METADATA_STRATEGY_DO_NOT_MERGE, METADATA_STRATEGY_IGNORE) + .defaultValue(METADATA_STRATEGY_DO_NOT_MERGE.getValue()) + .build(); + public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( "Bin-Packing Algorithm", "Bin-Packing Algorithm", @@ -307,6 +334,7 @@ public class MergeContent extends BinFiles { descriptors.add(MERGE_FORMAT); descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY); descriptors.add(CORRELATION_ATTRIBUTE_NAME); + descriptors.add(METADATA_STRATEGY); descriptors.add(MIN_ENTRIES); descriptors.add(MAX_ENTRIES); descriptors.add(MIN_SIZE); @@ -861,6 +889,7 @@ public class MergeContent extends BinFiles { final ProcessSession session = bin.getSession(); final List contents = bin.getContents(); + final String metadataStrategy = context.getProperty(METADATA_STRATEGY).getValue(); final Map metadata = new TreeMap<>(); final AtomicReference schema = new AtomicReference<>(null); final AtomicReference inputCodec = new AtomicReference<>(null); @@ -883,11 +912,13 @@ public class MergeContent extends BinFiles { // this is the first file - set up the writer, and store the // Schema & metadata we'll use. schema.set(reader.getSchema()); - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - metadata.put(key, metadatum); - writer.setMeta(key, metadatum); + if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + metadata.put(key, metadatum); + writer.setMeta(key, metadatum); + } } } inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); @@ -905,19 +936,25 @@ public class MergeContent extends BinFiles { unmerged.add(flowFile); } - // check that we're appending to the same metadata - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - byte[] writersMetadatum = metadata.get(key); - if (!Arrays.equals(metadatum, writersMetadatum)) { - getLogger().debug("Input file {} has different non-reserved metadata, not merging", - new Object[]{flowFile.getId()}); - canMerge = false; - unmerged.add(flowFile); + if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy) + || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) { + // check that we're appending to the same metadata + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + byte[] writersMetadatum = metadata.get(key); + if (!Arrays.equals(metadatum, writersMetadatum)) { + // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge + if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) { + getLogger().debug("Input file {} has different non-reserved metadata, not merging", + new Object[]{flowFile.getId()}); + canMerge = false; + unmerged.add(flowFile); + } + } } } - } + } // else the metadata in the first FlowFile was either ignored or retained in the if-clause above // check that we're appending to the same codec String thisCodec = reader.getMetaString(DataFileConstants.CODEC); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 4cd44604d8..d4617125d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -197,6 +197,251 @@ public class TestMergeContent { Assert.assertTrue(places.containsKey("Some Place")); } + @Test + public void testAvroConcatWithDifferentMetadataDoNotMerge() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_DO_NOT_MERGE); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + final Map userMeta1 = new HashMap() {{ + put("test_metadata1", "Test 1"); + }}; + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + final Map userMeta2 = new HashMap() {{ + put("test_metadata1", "Test 2"); // Test non-matching values + }}; + + final GenericRecord user3 = new GenericData.Record(schema); + user3.put("name", "John"); + user3.put("favorite_number", 5); + user3.put("favorite_color", "blue"); + final Map userMeta3 = new HashMap() {{ + put("test_metadata1", "Test 1"); + put("test_metadata2", "Test"); // Test unique + }}; + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 2); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + // create a reader for the merged content + byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema, "name"); + + Assert.assertEquals(1, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + } + + @Test + public void testAvroConcatWithDifferentMetadataIgnore() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_IGNORE); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + final Map userMeta1 = new HashMap() {{ + put("test_metadata1", "Test 1"); + }}; + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + final Map userMeta2 = new HashMap() {{ + put("test_metadata1", "Test 2"); // Test non-matching values + }}; + + final GenericRecord user3 = new GenericData.Record(schema); + user3.put("name", "John"); + user3.put("favorite_number", 5); + user3.put("favorite_color", "blue"); + final Map userMeta3 = new HashMap() {{ + put("test_metadata1", "Test 1"); + put("test_metadata2", "Test"); // Test unique + }}; + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + // create a reader for the merged content + byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema, "name"); + + Assert.assertEquals(3, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + Assert.assertTrue(users.containsKey("Ben")); + Assert.assertTrue(users.containsKey("John")); + } + + @Test + public void testAvroConcatWithDifferentMetadataUseFirst() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_USE_FIRST); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + final Map userMeta1 = new HashMap() {{ + put("test_metadata1", "Test 1"); + }}; + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + final Map userMeta2 = new HashMap() {{ + put("test_metadata1", "Test 2"); // Test non-matching values + }}; + + final GenericRecord user3 = new GenericData.Record(schema); + user3.put("name", "John"); + user3.put("favorite_number", 5); + user3.put("favorite_color", "blue"); + final Map userMeta3 = new HashMap() {{ + put("test_metadata1", "Test 1"); + put("test_metadata2", "Test"); // Test unique + }}; + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + // create a reader for the merged content + byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema, "name"); + + Assert.assertEquals(3, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + Assert.assertTrue(users.containsKey("Ben")); + Assert.assertTrue(users.containsKey("John")); + } + + @Test + public void testAvroConcatWithDifferentMetadataKeepCommon() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + runner.setProperty(MergeContent.MIN_ENTRIES, "3"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO); + runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_ALL_COMMON); + + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc")); + + final GenericRecord user1 = new GenericData.Record(schema); + user1.put("name", "Alyssa"); + user1.put("favorite_number", 256); + final Map userMeta1 = new HashMap() {{ + put("test_metadata1", "Test 1"); + }}; + + final GenericRecord user2 = new GenericData.Record(schema); + user2.put("name", "Ben"); + user2.put("favorite_number", 7); + user2.put("favorite_color", "red"); + final Map userMeta2 = new HashMap() {{ + put("test_metadata1", "Test 2"); // Test non-matching values + }}; + + final GenericRecord user3 = new GenericData.Record(schema); + user3.put("name", "John"); + user3.put("favorite_number", 5); + user3.put("favorite_color", "blue"); + final Map userMeta3 = new HashMap() {{ + put("test_metadata1", "Test 1"); + put("test_metadata2", "Test"); // Test unique + }}; + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + final ByteArrayOutputStream out1 = serializeAvroRecord(schema, user1, datumWriter, userMeta1); + final ByteArrayOutputStream out2 = serializeAvroRecord(schema, user2, datumWriter, userMeta2); + final ByteArrayOutputStream out3 = serializeAvroRecord(schema, user3, datumWriter, userMeta3); + + runner.enqueue(out1.toByteArray()); + runner.enqueue(out2.toByteArray()); + runner.enqueue(out3.toByteArray()); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 1); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary"); + + // create a reader for the merged content + byte[] data = runner.getContentAsByteArray(bundle); + final Map users = getGenericRecordMap(data, schema, "name"); + + Assert.assertEquals(2, users.size()); + Assert.assertTrue(users.containsKey("Alyssa")); + Assert.assertTrue(users.containsKey("John")); + } + private Map getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException { // create a reader for the merged contet DatumReader datumReader = new GenericDatumReader<>(schema); @@ -213,8 +458,15 @@ public class TestMergeContent { } private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter datumWriter) throws IOException { + return serializeAvroRecord(schema, user2, datumWriter, null); + } + + private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter datumWriter, Map metadata) throws IOException { ByteArrayOutputStream out2 = new ByteArrayOutputStream(); - DataFileWriter dataFileWriter2 = new DataFileWriter(datumWriter); + DataFileWriter dataFileWriter2 = new DataFileWriter<>(datumWriter); + if (metadata != null) { + metadata.forEach(dataFileWriter2::setMeta); + } dataFileWriter2.create(schema, out2); dataFileWriter2.append(user2); dataFileWriter2.close();