diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index dfdd401033..cf0539ed71 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -25,6 +25,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @EventDriven @@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor { return; } - try { - final String segmentId = UUID.randomUUID().toString(); - final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); + final String segmentId = UUID.randomUUID().toString(); + final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); - final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - if (flowFile.getSize() <= segmentSize) { - flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); - flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); + if (flowFile.getSize() <= segmentSize) { + flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); + flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); + flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); + flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); - flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); - flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1"); - flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1"); + flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); + flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1"); + flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1"); - FlowFile clone = session.clone(flowFile); - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(clone, REL_SEGMENTS); - return; - } - - int totalSegments = (int) (flowFile.getSize() / segmentSize); - if (totalSegments * segmentSize < flowFile.getSize()) { - totalSegments++; - } - - final Map segmentAttributes = new HashMap<>(); - segmentAttributes.put(SEGMENT_ID, segmentId); - segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments)); - segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); - - segmentAttributes.put(FRAGMENT_ID, segmentId); - segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments)); - - final Set segmentSet = new HashSet<>(); - for (int i = 1; i <= totalSegments; i++) { - final long segmentOffset = segmentSize * (i - 1); - FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); - segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); - segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); - segment = session.putAllAttributes(segment, segmentAttributes); - segmentSet.add(segment); - } - - session.transfer(segmentSet, REL_SEGMENTS); + FlowFile clone = session.clone(flowFile); session.transfer(flowFile, REL_ORIGINAL); + session.transfer(clone, REL_SEGMENTS); + return; + } - if (totalSegments <= 10) { - getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); - } else { - getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); - } - } catch (final Exception e) { - throw new ProcessException(e); + int totalSegments = (int) (flowFile.getSize() / segmentSize); + if (totalSegments * segmentSize < flowFile.getSize()) { + totalSegments++; + } + + final Map segmentAttributes = new HashMap<>(); + segmentAttributes.put(SEGMENT_ID, segmentId); + segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments)); + segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); + + segmentAttributes.put(FRAGMENT_ID, segmentId); + segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments)); + + final Set segmentSet = new HashSet<>(); + for (int i = 1; i <= totalSegments; i++) { + final long segmentOffset = segmentSize * (i - 1); + FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); + segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); + segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); + segment = session.putAllAttributes(segment, segmentAttributes); + segmentSet.add(segment); + } + + session.transfer(segmentSet, REL_SEGMENTS); + session.transfer(flowFile, REL_ORIGINAL); + + if (totalSegments <= 10) { + getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); + } else { + getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); } } - }