diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java index 5605b8d7ea..e71f5aa46a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java @@ -298,16 +298,10 @@ public class PutEmail extends AbstractProcessor { final ProcessorLog logger = getLogger(); try { - message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions(flowFile).getValue())[0]); - - final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.TO, toAddresses); - - final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.CC, ccAddresses); - - final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue()); - message.setRecipients(RecipientType.BCC, bccAddresses); + message.addFrom(toInetAddresses(context, flowFile, FROM)); + message.setRecipients(RecipientType.TO, toInetAddresses(context, flowFile, TO)); + message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC)); + message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC)); message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue()); message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue()); @@ -344,14 +338,14 @@ public class PutEmail extends AbstractProcessor { message.setContent(multipart); } - Transport.send(message); + send(message); session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString()); session.transfer(flowFile, REL_SUCCESS); logger.info("Sent email as a result of receiving {}", new Object[]{flowFile}); } catch (final ProcessException | MessagingException | IOException e) { context.yield(); - logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e}); + logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e.getMessage()}, e); session.transfer(flowFile, REL_FAILURE); } } @@ -418,7 +412,7 @@ public class PutEmail extends AbstractProcessor { StringBuilder message = new StringBuilder(messagePrepend); message.append(BODY_SEPARATOR); message.append("\nStandard FlowFile Metadata:"); - message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getId())); + message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getAttribute(CoreAttributes.UUID.key()))); message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate()))); message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize())); message.append("\nFlowFile Attributes:"); @@ -429,11 +423,43 @@ public class PutEmail extends AbstractProcessor { return message.toString(); } - private static InternetAddress[] toInetAddresses(final String val) throws AddressException { - if (val == null) { - return new InternetAddress[0]; + /** + * @param context the current context + * @param flowFile the current flow file + * @param propertyDescriptor the property to evaluate + * @return an InternetAddress[] parsed from the supplied property + * @throws AddressException if the property cannot be parsed to a valid InternetAddress[] + */ + private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile, + PropertyDescriptor propertyDescriptor) throws AddressException { + InternetAddress[] parse; + String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); + if (value == null || value.isEmpty()){ + if (propertyDescriptor.isRequired()) { + final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string."; + throw new AddressException(exceptionMsg); + } else { + parse = new InternetAddress[0]; + } + } else { + try { + parse = InternetAddress.parse(value); + } catch (AddressException e) { + final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '"+ value +"'"; + throw new AddressException(exceptionMsg); + } } - return InternetAddress.parse(val); + return parse; + } + + /** + * Wrapper for static method {@link Transport#send(Message)} to add testability of this class. + * + * @param msg the message to send + * @throws MessagingException on error + */ + protected void send(final Message msg) throws MessagingException { + Transport.send(msg); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java index 727e8e9f85..fb11d8f60c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutEmail.java @@ -16,28 +16,79 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage.RecipientType; + +import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import static org.junit.Assert.assertEquals; +import org.junit.Before; import org.junit.Test; public class TestPutEmail { + /** + * Extension to PutEmail that stubs out the calls to + * Transport.sendMessage(). + * + *

+ * All sent messages are records in a list available via the + * {@link #getMessages()} method.

+ *

Calling + * {@link #setException(MessagingException)} will cause the supplied exception to be + * thrown when sendMessage is invoked. + *

+ */ + private static final class PutEmailExtension extends PutEmail { + private MessagingException e; + private ArrayList messages = new ArrayList<>(); + + @Override + protected void send(Message msg) throws MessagingException { + messages.add(msg); + if (this.e != null) { + throw e; + } + } + + void setException(final MessagingException e) { + this.e = e; + } + + List getMessages() { + return messages; + } + } + + PutEmailExtension processor; + TestRunner runner; + + @Before + public void setup() { + processor = new PutEmailExtension(); + runner = TestRunners.newTestRunner(processor); + } + @Test - public void testHostNotFound() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); + public void testExceptionWhenSending() { + // verifies that files are routed to failure when Transport.send() throws a MessagingException runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123"); runner.setProperty(PutEmail.FROM, "test@apache.org"); runner.setProperty(PutEmail.TO, "test@apache.org"); runner.setProperty(PutEmail.MESSAGE, "Message Body"); + processor.setException(new MessagingException("Forced failure from send()")); + final Map attributes = new HashMap<>(); runner.enqueue("Some Text".getBytes(), attributes); @@ -45,38 +96,112 @@ public class TestPutEmail { runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE); + assertEquals("Expected an attempt to send a single message", 1, processor.getMessages().size()); } @Test - public void testEmailPropertyFormatters() { - // verifies that files are routed to failure when the SMTP host doesn't exist - final TestRunner runner = TestRunners.newTestRunner(new PutEmail()); - runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + public void testOutgoingMessage() throws Exception { + // verifies that are set on the outgoing Message correctly runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); - runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}"); runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); runner.setProperty(PutEmail.FROM, "test@apache.org"); runner.setProperty(PutEmail.MESSAGE, "Message Body"); runner.setProperty(PutEmail.TO, "recipient@apache.org"); - ProcessSession session = runner.getProcessSessionFactory().createSession(); - FlowFile ff = session.create(); - ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory"); - ProcessContext context = runner.getProcessContext(); + runner.enqueue("Some Text".getBytes()); - String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue(); - assertEquals("X-Mailer Header", "TestingNiFi", xmailer); + runner.run(); - String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue(); - assertEquals("Socket Factory", "testingSocketFactory", socketFactory); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS); - final Map attributes = new HashMap<>(); + // Verify that the Message was populated correctly + assertEquals("Expected a single message to be sent", 1, processor.getMessages().size()); + Message message = processor.getMessages().get(0); + assertEquals("test@apache.org", message.getFrom()[0].toString()); + assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]); + assertEquals("Message Body", message.getContent()); + assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString()); + assertNull(message.getRecipients(RecipientType.BCC)); + assertNull(message.getRecipients(RecipientType.CC)); + } + + @Test + public void testOutgoingMessageWithOptionalProperties() throws Exception { + // verifies that optional attributes are set on the outgoing Message correctly + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "${from}"); + runner.setProperty(PutEmail.MESSAGE, "${message}"); + runner.setProperty(PutEmail.TO, "${to}"); + runner.setProperty(PutEmail.BCC, "${bcc}"); + runner.setProperty(PutEmail.CC, "${cc}"); + + Map attributes = new HashMap<>(); + attributes.put("from", "test@apache.org "); + attributes.put("message", "the message body"); + attributes.put("to", "to@apache.org"); + attributes.put("bcc", "bcc@apache.org"); + attributes.put("cc", "cc@apache.org"); runner.enqueue("Some Text".getBytes(), attributes); runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS); + + // Verify that the Message was populated correctly + assertEquals("Expected a single message to be sent", 1, processor.getMessages().size()); + Message message = processor.getMessages().get(0); + assertEquals("\"test@apache.org\" ", message.getFrom()[0].toString()); + assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]); + assertEquals("the message body", message.getContent()); + assertEquals(1, message.getRecipients(RecipientType.TO).length); + assertEquals("to@apache.org", message.getRecipients(RecipientType.TO)[0].toString()); + assertEquals(1, message.getRecipients(RecipientType.BCC).length); + assertEquals("bcc@apache.org", message.getRecipients(RecipientType.BCC)[0].toString()); + assertEquals(1, message.getRecipients(RecipientType.CC).length); + assertEquals("cc@apache.org",message.getRecipients(RecipientType.CC)[0].toString()); + } + + @Test + public void testInvalidAddress() throws Exception { + // verifies that unparsable addresses lead to the flow file being routed to failure + runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host"); + runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi"); + runner.setProperty(PutEmail.FROM, "test@apache.org