From 0f8d00d5fff947647926570ccfa4b83ac9411e4b Mon Sep 17 00:00:00 2001 From: danbress Date: Sat, 14 Feb 2015 12:20:18 -0500 Subject: [PATCH] NIFI-333 - catching ProcessException instead of Exception when appropriate catching ProcessException in processors instead of exception --- .../hadoop/CreateHadoopSequenceFile.java | 4 +-- .../standard/Base64EncodeContent.java | 13 ++++----- .../processors/standard/CompressContent.java | 27 ++++++++++--------- .../nifi/processors/standard/HashContent.java | 14 +++++----- .../nifi/processors/standard/PutEmail.java | 5 ++-- .../processors/standard/TransformXml.java | 3 ++- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 1422a7b599..a031923bcd 100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -33,7 +34,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; -import org.apache.hadoop.io.SequenceFile.CompressionType; /** *

@@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType); session.transfer(flowFile, RELATIONSHIP_SUCCESS); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); - } catch (Exception e) { + } catch (ProcessException e) { getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e); session.transfer(flowFile, RELATIONSHIP_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index d9175e06ca..cd272ff4e2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.codec.binary.Base64InputStream; import org.apache.commons.codec.binary.Base64OutputStream; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; @@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream; import org.apache.nifi.util.StopWatch; @@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor { logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { + } catch (ProcessException e) { logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index cf20f16d40..e631cd0e62 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit; import lzma.sdk.lzma.Decoder; import lzma.streams.LzmaInputStream; import lzma.streams.LzmaOutputStream; + +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.GZIPOutputStream; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.GZIPOutputStream; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; - -import org.apache.commons.compress.compressors.CompressorStreamFactory; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.tukaani.xz.LZMA2Options; import org.tukaani.xz.XZInputStream; import org.tukaani.xz.XZOutputStream; @@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor { compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (final Exception e) { + } catch (final ProcessException e) { logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java index 827653bec0..9f8a16c4d9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java @@ -28,23 +28,23 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.NullOutputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.behavior.EventDriven; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.ObjectHolder; @EventDriven @@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor { logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()}); session.getProvenanceReporter().modifyAttributes(flowFile); session.transfer(flowFile, REL_SUCCESS); - } catch (final Exception e) { + } catch (final ProcessException e) { logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 2fa71c8994..eb6b1cca63 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -32,6 +32,7 @@ import java.util.Set; import javax.activation.DataHandler; import javax.mail.Message; import javax.mail.Message.RecipientType; +import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.URLName; import javax.mail.internet.AddressException; @@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; - import org.apache.commons.codec.binary.Base64; import com.sun.mail.smtp.SMTPTransport; @@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor { session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); session.transfer(flowFile, REL_SUCCESS); logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); - } catch (final Exception e) { + } catch (final ProcessException | MessagingException | IOException e) { context.yield(); logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java index 5e251f6c1a..8a2feb837d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java @@ -51,6 +51,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; @@ -152,7 +153,7 @@ public class TransformXml extends AbstractProcessor { session.transfer(transformed, REL_SUCCESS); session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); logger.info("Transformed {}", new Object[]{original}); - } catch (Exception e) { + } catch (ProcessException e) { logger.error("Unable to transform {} due to {}", new Object[]{original, e}); session.transfer(original, REL_FAILURE); }