From 8382b0a7a3a2735e9047db660df4033c9976c911 Mon Sep 17 00:00:00 2001 From: Jeyassri Balchandran Date: Fri, 26 May 2023 19:13:46 +0530 Subject: [PATCH] NIFI-11540 Removed legacy segment attributes from MergeContent This closes #7305 Co-authored-by: David Handermann Signed-off-by: David Handermann --- .../processors/standard/MergeContent.java | 23 ++----------------- .../processors/standard/SegmentContent.java | 20 ---------------- .../processors/standard/TestMergeContent.java | 12 +++++----- 3 files changed, 8 insertions(+), 47 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index c0ce6ade8a..5b4afdca81 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -154,14 +154,8 @@ public class MergeContent extends BinFiles { public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key(); - - // old style attributes - public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier"; - public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index"; - public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); - public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata", "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."); @@ -184,8 +178,7 @@ public class MergeContent extends BinFiles { "Defragment", "Defragment", "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " - + "have the attributes , , and or alternatively (for backward compatibility " - + "purposes) , , and . All FlowFiles with the same value for \"fragment.identifier\" " + + "have the attributes , , and . All FlowFiles with the same value for \"fragment.identifier\" " + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles " + "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute."); @@ -430,19 +423,7 @@ public class MergeContent extends BinFiles { @Override protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { - FlowFile processed = flowFile; - // handle backward compatibility with old segment attributes - if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) { - processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE)); - } - if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) { - processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE)); - } - if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) { - processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE)); - } - - return processed; + return flowFile; } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index d29bff812d..8dba735ce1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -57,17 +57,6 @@ import org.apache.nifi.processor.util.StandardValidators; + "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the " + "MergeContent processor in order to reconstitute the original FlowFile") @WritesAttributes({ - @WritesAttribute(attribute = "segment.identifier", - description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this " - + "attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as " - + "it is designed to work in conjunction with the MergeContent Processor"), - @WritesAttribute(attribute = "segment.index", - description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. " - + "This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed " - + "to work in conjunction with the MergeContent Processor"), - @WritesAttribute(attribute = "segment.count", - description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, " - + "but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"), @WritesAttribute(attribute = "fragment.identifier", description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @WritesAttribute(attribute = "fragment.index", @@ -79,9 +68,6 @@ import org.apache.nifi.processor.util.StandardValidators; @SeeAlso(MergeContent.class) public class SegmentContent extends AbstractProcessor { - public static final String SEGMENT_ID = "segment.identifier"; - public static final String SEGMENT_INDEX = "segment.index"; - public static final String SEGMENT_COUNT = "segment.count"; public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); @@ -143,9 +129,6 @@ public class SegmentContent extends AbstractProcessor { final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); if (flowFile.getSize() <= segmentSize) { - flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); - flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); @@ -164,8 +147,6 @@ public class SegmentContent extends AbstractProcessor { } final Map segmentAttributes = new HashMap<>(); - segmentAttributes.put(SEGMENT_ID, segmentId); - segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments)); segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); segmentAttributes.put(FRAGMENT_ID, segmentId); @@ -175,7 +156,6 @@ public class SegmentContent extends AbstractProcessor { for (int i = 1; i <= totalSegments; i++) { final long segmentOffset = segmentSize * (i - 1); FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); - segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); segment = session.putAllAttributes(segment, segmentAttributes); segmentSet.add(segment); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 4b2984b6b4..a77f054215 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -1129,17 +1129,17 @@ public class TestMergeContent { runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); final Map attributes = new HashMap<>(); - attributes.put("segment.identifier", "1"); - attributes.put("segment.count", "4"); - attributes.put("segment.index", "1"); + attributes.put("fragment.identifier", "1"); + attributes.put("fragment.count", "4"); + attributes.put("fragment.index", "1"); attributes.put("segment.original.filename", "originalfilename"); runner.enqueue("A Man ".getBytes("UTF-8"), attributes); - attributes.put("segment.index", "2"); + attributes.put("fragment.index", "2"); runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); - attributes.put("segment.index", "3"); + attributes.put("fragment.index", "3"); runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); - attributes.put("segment.index", "4"); + attributes.put("fragment.index", "4"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); runner.run();