NIFI-11540 Removed legacy segment attributes from MergeContent

This closes #7305

Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Jeyassri Balchandran 2023-05-26 19:13:46 +05:30 committed by exceptionfactory
parent ca6c7f7041
commit 8382b0a7a3
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 8 additions and 47 deletions

View File

@ -154,14 +154,8 @@ public class MergeContent extends BinFiles {
public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key(); public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
// old style attributes
public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata", public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new AllowableValue("Use First Metadata", "Use First Metadata",
"For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile."); "For any input format that supports metadata (Avro, e.g.), the metadata for the first FlowFile in the bin will be set on the output FlowFile.");
@ -184,8 +178,7 @@ public class MergeContent extends BinFiles {
"Defragment", "Defragment",
"Defragment", "Defragment",
"Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must "
+ "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or alternatively (for backward compatibility " + "have the attributes <fragment.identifier>, <fragment.count>, and <fragment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "purposes) <segment.identifier>, <segment.count>, and <segment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles " + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles "
+ "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute."); + "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute.");
@ -430,19 +423,7 @@ public class MergeContent extends BinFiles {
@Override @Override
protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { protected FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
FlowFile processed = flowFile; return flowFile;
// handle backward compatibility with old segment attributes
if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
processed = session.putAttribute(processed, FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
}
if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
processed = session.putAttribute(processed, FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
}
if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE, processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
}
return processed;
} }
@Override @Override

View File

@ -57,17 +57,6 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the " + "fragment.identifier, fragment.index, fragment.count, segment.original.filename; these attributes can then be used by the "
+ "MergeContent processor in order to reconstitute the original FlowFile") + "MergeContent processor in order to reconstitute the original FlowFile")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "segment.identifier",
description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this "
+ "attribute. This attribute is added to maintain backward compatibility, but the fragment.identifier is preferred, as "
+ "it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.index",
description = "A one-up number that indicates the ordering of the segments that were created from a single parent FlowFile. "
+ "This attribute is added to maintain backward compatibility, but the fragment.index is preferred, as it is designed "
+ "to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "segment.count",
description = "The number of segments generated from the parent FlowFile. This attribute is added to maintain backward compatibility, "
+ "but the fragment.count is preferred, as it is designed to work in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "fragment.identifier", @WritesAttribute(attribute = "fragment.identifier",
description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), description = "All segments produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", @WritesAttribute(attribute = "fragment.index",
@ -79,9 +68,6 @@ import org.apache.nifi.processor.util.StandardValidators;
@SeeAlso(MergeContent.class) @SeeAlso(MergeContent.class)
public class SegmentContent extends AbstractProcessor { public class SegmentContent extends AbstractProcessor {
public static final String SEGMENT_ID = "segment.identifier";
public static final String SEGMENT_INDEX = "segment.index";
public static final String SEGMENT_COUNT = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
@ -143,9 +129,6 @@ public class SegmentContent extends AbstractProcessor {
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (flowFile.getSize() <= segmentSize) { if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
@ -164,8 +147,6 @@ public class SegmentContent extends AbstractProcessor {
} }
final Map<String, String> segmentAttributes = new HashMap<>(); final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId); segmentAttributes.put(FRAGMENT_ID, segmentId);
@ -175,7 +156,6 @@ public class SegmentContent extends AbstractProcessor {
for (int i = 1; i <= totalSegments; i++) { for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1); final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes); segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment); segmentSet.add(segment);

View File

@ -1129,17 +1129,17 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("segment.identifier", "1"); attributes.put("fragment.identifier", "1");
attributes.put("segment.count", "4"); attributes.put("fragment.count", "4");
attributes.put("segment.index", "1"); attributes.put("fragment.index", "1");
attributes.put("segment.original.filename", "originalfilename"); attributes.put("segment.original.filename", "originalfilename");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes); runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "2"); attributes.put("fragment.index", "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "3"); attributes.put("fragment.index", "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
attributes.put("segment.index", "4"); attributes.put("fragment.index", "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes); runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run(); runner.run();