diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java index 727f384e6a..68519f2356 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -96,6 +96,9 @@ import java.util.stream.Collectors; @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), @WritesAttribute(attribute = "merge.uuid", description = "UUID of the merged FlowFile that will be added to the original FlowFiles attributes"), + @WritesAttribute(attribute = MergeRecord.MERGE_COMPLETION_REASON, description = "This processor allows for several thresholds to be configured for merging FlowFiles. " + + " This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values " + + " and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page."), @WritesAttribute(attribute = "", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.") }) @SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class}) @@ -171,6 +174,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor { public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid"; + public static final String MERGE_COMPLETION_REASON = "merge.completion.reason"; public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( "Bin-Packing Algorithm", diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java index ebb6bdca7f..8b0553fb63 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -398,6 +398,7 @@ public class RecordBin { attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType()); attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size())); attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge())); + attributes.put(MergeRecord.MERGE_COMPLETION_REASON, completionReason); merged = session.putAllAttributes(merged, attributes); flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key()))); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java index dea9990f24..656b4d5af2 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java @@ -79,6 +79,7 @@ public class TestMergeRecord { runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); + mff.assertAttributeEquals(MergeRecord.MERGE_COMPLETION_REASON, "Bin is full"); } @Test @@ -96,6 +97,7 @@ public class TestMergeRecord { runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); + mff.assertAttributeEquals(MergeRecord.MERGE_COMPLETION_REASON, "Bin is full enough"); } @Test