NIFI-333 - Removing exception handling in SegmentContent

From Mark - 'Theres no exception that could get thrown in there unless
theres something weird - in which case the framework should catch it and
handle it'
This commit is contained in:
danbress 2015-02-17 11:17:46 -05:00
parent 0fa1b16c83
commit c8810c04d8
1 changed files with 50 additions and 56 deletions

View File

@ -25,6 +25,11 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor {
return;
}
try {
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
if (flowFile.getSize() <= segmentSize) {
flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SEGMENTS);
return;
}
int totalSegments = (int) (flowFile.getSize() / segmentSize);
if (totalSegments * segmentSize < flowFile.getSize()) {
totalSegments++;
}
final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
final Set<FlowFile> segmentSet = new HashSet<>();
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
}
session.transfer(segmentSet, REL_SEGMENTS);
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SEGMENTS);
return;
}
if (totalSegments <= 10) {
getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
} else {
getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
}
} catch (final Exception e) {
throw new ProcessException(e);
int totalSegments = (int) (flowFile.getSize() / segmentSize);
if (totalSegments * segmentSize < flowFile.getSize()) {
totalSegments++;
}
final Map<String, String> segmentAttributes = new HashMap<>();
segmentAttributes.put(SEGMENT_ID, segmentId);
segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments));
final Set<FlowFile> segmentSet = new HashSet<>();
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset));
segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
}
session.transfer(segmentSet, REL_SEGMENTS);
session.transfer(flowFile, REL_ORIGINAL);
if (totalSegments <= 10) {
getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet});
} else {
getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments});
}
}
}