NIFI-333 - catching ProcessException instead of Exception

when appropriate catching ProcessException in processors instead of
exception
This commit is contained in:
danbress 2015-02-14 12:20:18 -05:00
parent b8ade5b129
commit 0f8d00d5ff
6 changed files with 35 additions and 31 deletions

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -33,7 +34,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.hadoop.io.SequenceFile.CompressionType;
/**
* <p>
@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (Exception e) {
} catch (ProcessException e) {
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
}

View File

@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.util.StopWatch;
@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor {
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
} catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor {
compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
} catch (final ProcessException e) {
logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -28,23 +28,23 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor {
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
} catch (final ProcessException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -32,6 +32,7 @@ import java.util.Set;
import javax.activation.DataHandler;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.URLName;
import javax.mail.internet.AddressException;
@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.codec.binary.Base64;
import com.sun.mail.smtp.SMTPTransport;
@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
} catch (final Exception e) {
} catch (final ProcessException | MessagingException | IOException e) {
context.yield();
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);

View File

@ -51,6 +51,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@ -152,7 +153,7 @@ public class TransformXml extends AbstractProcessor {
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
} catch (Exception e) {
} catch (ProcessException e) {
logger.error("Unable to transform {} due to {}", new Object[]{original, e});
session.transfer(original, REL_FAILURE);
}