From 0f8d00d5fff947647926570ccfa4b83ac9411e4b Mon Sep 17 00:00:00 2001 From: danbress Date: Sat, 14 Feb 2015 12:20:18 -0500 Subject: [PATCH 1/4] NIFI-333 - catching ProcessException instead of Exception when appropriate catching ProcessException in processors instead of exception --- .../hadoop/CreateHadoopSequenceFile.java | 4 +-- .../standard/Base64EncodeContent.java | 13 ++++----- .../processors/standard/CompressContent.java | 27 ++++++++++--------- .../nifi/processors/standard/HashContent.java | 14 +++++----- .../nifi/processors/standard/PutEmail.java | 5 ++-- .../processors/standard/TransformXml.java | 3 ++- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 1422a7b599..a031923bcd 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -33,7 +34,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; -import org.apache.hadoop.io.SequenceFile.CompressionType; /** *

@@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType); session.transfer(flowFile, RELATIONSHIP_SUCCESS); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); - } catch (Exception e) { + } catch (ProcessException e) { getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e); session.transfer(flowFile, RELATIONSHIP_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index d9175e06ca..cd272ff4e2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.codec.binary.Base64InputStream; import org.apache.commons.codec.binary.Base64OutputStream; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; @@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream; import org.apache.nifi.util.StopWatch; @@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor { logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { + } catch (ProcessException e) { logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index cf20f16d40..e631cd0e62 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit; import lzma.sdk.lzma.Decoder; import lzma.streams.LzmaInputStream; import lzma.streams.LzmaOutputStream; + +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.GZIPOutputStream; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.GZIPOutputStream; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; - -import org.apache.commons.compress.compressors.CompressorStreamFactory; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.tukaani.xz.LZMA2Options; import org.tukaani.xz.XZInputStream; import org.tukaani.xz.XZOutputStream; @@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor { compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (final Exception e) { + } catch (final ProcessException e) { logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java index 827653bec0..9f8a16c4d9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java @@ -28,23 +28,23 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.NullOutputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.ObjectHolder; @EventDriven @@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor { logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()}); session.getProvenanceReporter().modifyAttributes(flowFile); session.transfer(flowFile, REL_SUCCESS); - } catch (final Exception e) { + } catch (final ProcessException e) { logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 2fa71c8994..eb6b1cca63 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -32,6 +32,7 @@ import java.util.Set; import javax.activation.DataHandler; import javax.mail.Message; import javax.mail.Message.RecipientType; +import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.URLName; import javax.mail.internet.AddressException; @@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; - import org.apache.commons.codec.binary.Base64; import com.sun.mail.smtp.SMTPTransport; @@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor { session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); session.transfer(flowFile, REL_SUCCESS); logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); - } catch (final Exception e) { + } catch (final ProcessException | MessagingException | IOException e) { context.yield(); logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java index 5e251f6c1a..8a2feb837d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java @@ -51,6 +51,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; @@ -152,7 +153,7 @@ public class TransformXml extends AbstractProcessor { session.transfer(transformed, REL_SUCCESS); session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); logger.info("Transformed {}", new Object[]{original}); - } catch (Exception e) { + } catch (ProcessException e) { logger.error("Unable to transform {} due to {}", new Object[]{original, e}); session.transfer(original, REL_FAILURE); } From 361ac1f1e5c8ee943220b337c92cd49616a02194 Mon Sep 17 00:00:00 2001 From: danbress Date: Sun, 15 Feb 2015 09:08:22 -0500 Subject: [PATCH 2/4] NIFI-333 - Adding failure test for DecompressContent --- .../standard/TestCompressContent.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 71c85831d3..df1d506678 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@ -109,4 +109,22 @@ public class TestCompressContent { flowFile.assertAttributeEquals("filename", "SampleFile.txt.gz"); } + + @Test + public void testDecompressFailure() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "decompress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "gzip"); + + byte[] data = new byte[]{1,2,3,4,5,6,7,8,9,10}; + runner.enqueue(data); + + + assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid()); + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(CompressContent.REL_FAILURE, 1); + + runner.getFlowFilesForRelationship(CompressContent.REL_FAILURE).get(0).assertContentEquals(data); + } } From 0fa1b16c831c413a2e31e768398da0602ca3758d Mon Sep 17 00:00:00 2001 From: danbress Date: Sun, 15 Feb 2015 09:24:51 -0500 Subject: [PATCH 3/4] NIFI-333 - Adding failure test for PutEmail --- .../processors/standard/TestPutEmail.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java new file mode 100644 index 0000000000..b737ed6173 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestPutEmail { + + @Test + public void testHotNotFound() { + // verifies that files are routed to failure when the SMTP host doesn't exist + final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); + runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123"); + runner.setProperty(PutEmail.FROM, "test@apache.org"); + runner.setProperty(PutEmail.TO, "test@apache.org"); + runner.setProperty(PutEmail.MESSAGE, "Message Body"); + + final Map attributes = new HashMap<>(); + runner.enqueue("Some Text".getBytes(), attributes); + + runner.run(); + + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + } +} From c8810c04d8dcb6823c7e59e85158a0c3cb18675f Mon Sep 17 00:00:00 2001 From: danbress Date: Tue, 17 Feb 2015 11:17:46 -0500 Subject: [PATCH 4/4] NIFI-333 - Removing exception handling in SegmentContent From Mark - 'Theres no exception that could get thrown in there unless theres something weird - in which case the framework should catch it and handle it' --- .../processors/standard/SegmentContent.java | 106 +++++++++--------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index dfdd401033..cf0539ed71 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -25,6 +25,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @EventDriven @@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor { return; } - try { - final String segmentId = UUID.randomUUID().toString(); - final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); + final String segmentId = UUID.randomUUID().toString(); + final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); - final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); - if (flowFile.getSize() <= segmentSize) { - flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); - flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); - flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); + if (flowFile.getSize() <= segmentSize) { + flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId); + flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1"); + flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1"); + flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName); - flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); - flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1"); - flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1"); + flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId); + flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1"); + flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1"); - FlowFile clone = session.clone(flowFile); - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(clone, REL_SEGMENTS); - return; - } - - int totalSegments = (int) (flowFile.getSize() / segmentSize); - if (totalSegments * segmentSize < flowFile.getSize()) { - totalSegments++; - } - - final Map segmentAttributes = new HashMap<>(); - segmentAttributes.put(SEGMENT_ID, segmentId); - segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments)); - segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); - - segmentAttributes.put(FRAGMENT_ID, segmentId); - segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments)); - - final Set segmentSet = new HashSet<>(); - for (int i = 1; i <= totalSegments; i++) { - final long segmentOffset = segmentSize * (i - 1); - FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); - segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); - segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); - segment = session.putAllAttributes(segment, segmentAttributes); - segmentSet.add(segment); - } - - session.transfer(segmentSet, REL_SEGMENTS); + FlowFile clone = session.clone(flowFile); session.transfer(flowFile, REL_ORIGINAL); + session.transfer(clone, REL_SEGMENTS); + return; + } - if (totalSegments <= 10) { - getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); - } else { - getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); - } - } catch (final Exception e) { - throw new ProcessException(e); + int totalSegments = (int) (flowFile.getSize() / segmentSize); + if (totalSegments * segmentSize < flowFile.getSize()) { + totalSegments++; + } + + final Map segmentAttributes = new HashMap<>(); + segmentAttributes.put(SEGMENT_ID, segmentId); + segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments)); + segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); + + segmentAttributes.put(FRAGMENT_ID, segmentId); + segmentAttributes.put(FRAGMENT_COUNT, String.valueOf(totalSegments)); + + final Set segmentSet = new HashSet<>(); + for (int i = 1; i <= totalSegments; i++) { + final long segmentOffset = segmentSize * (i - 1); + FlowFile segment = session.clone(flowFile, segmentOffset, Math.min(segmentSize, flowFile.getSize() - segmentOffset)); + segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i)); + segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i)); + segment = session.putAllAttributes(segment, segmentAttributes); + segmentSet.add(segment); + } + + session.transfer(segmentSet, REL_SEGMENTS); + session.transfer(flowFile, REL_ORIGINAL); + + if (totalSegments <= 10) { + getLogger().info("Segmented {} into {} segments: {}", new Object[]{flowFile, totalSegments, segmentSet}); + } else { + getLogger().info("Segmented {} into {} segments", new Object[]{flowFile, totalSegments}); } } - }