NIFI-13674 Added merge.competion.reason Attribute for MergeRecord (#9363)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
dan-s1 2024-10-11 18:00:34 -04:00 committed by GitHub
parent 1bfdf3c107
commit f7d5bd7774
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 7 additions and 0 deletions

View File

@ -96,6 +96,9 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively "
+ "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"),
@WritesAttribute(attribute = "merge.uuid", description = "UUID of the merged FlowFile that will be added to the original FlowFiles attributes"), @WritesAttribute(attribute = "merge.uuid", description = "UUID of the merged FlowFile that will be added to the original FlowFiles attributes"),
@WritesAttribute(attribute = MergeRecord.MERGE_COMPLETION_REASON, description = "This processor allows for several thresholds to be configured for merging FlowFiles. "
+ " This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values "
+ " and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page."),
@WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.") @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.")
}) })
@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class}) @SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
@ -171,6 +174,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid"; public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
public static final String MERGE_COMPLETION_REASON = "merge.completion.reason";
public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue(
"Bin-Packing Algorithm", "Bin-Packing Algorithm",

View File

@ -398,6 +398,7 @@ public class RecordBin {
attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType()); attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType());
attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size())); attributes.put(MergeRecord.MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size()));
attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge())); attributes.put(MergeRecord.MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge()));
attributes.put(MergeRecord.MERGE_COMPLETION_REASON, completionReason);
merged = session.putAllAttributes(merged, attributes); merged = session.putAllAttributes(merged, attributes);
flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key()))); flowFiles.forEach(ff -> session.putAttribute(ff, MergeRecord.MERGE_UUID_ATTRIBUTE, merged.getAttribute(CoreAttributes.UUID.key())));

View File

@ -79,6 +79,7 @@ public class TestMergeRecord {
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
mff.assertAttributeEquals(MergeRecord.MERGE_COMPLETION_REASON, "Bin is full");
} }
@Test @Test
@ -96,6 +97,7 @@ public class TestMergeRecord {
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach( runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(
ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE))); ff -> assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
mff.assertAttributeEquals(MergeRecord.MERGE_COMPLETION_REASON, "Bin is full enough");
} }
@Test @Test