mirror of https://github.com/apache/nifi.git
NIFI-1434 Prevent array index exception in PutEmail
Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
parent
2f80cd504b
commit
8ef337b65e
|
@ -298,16 +298,10 @@ public class PutEmail extends AbstractProcessor {
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions(flowFile).getValue())[0]);
|
message.addFrom(toInetAddresses(context, flowFile, FROM));
|
||||||
|
message.setRecipients(RecipientType.TO, toInetAddresses(context, flowFile, TO));
|
||||||
final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions(flowFile).getValue());
|
message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC));
|
||||||
message.setRecipients(RecipientType.TO, toAddresses);
|
message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC));
|
||||||
|
|
||||||
final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions(flowFile).getValue());
|
|
||||||
message.setRecipients(RecipientType.CC, ccAddresses);
|
|
||||||
|
|
||||||
final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue());
|
|
||||||
message.setRecipients(RecipientType.BCC, bccAddresses);
|
|
||||||
|
|
||||||
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue());
|
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue());
|
message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
|
@ -344,14 +338,14 @@ public class PutEmail extends AbstractProcessor {
|
||||||
message.setContent(multipart);
|
message.setContent(multipart);
|
||||||
}
|
}
|
||||||
|
|
||||||
Transport.send(message);
|
send(message);
|
||||||
|
|
||||||
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
|
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
|
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
|
||||||
} catch (final ProcessException | MessagingException | IOException e) {
|
} catch (final ProcessException | MessagingException | IOException e) {
|
||||||
context.yield();
|
context.yield();
|
||||||
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e});
|
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e.getMessage()}, e);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -418,7 +412,7 @@ public class PutEmail extends AbstractProcessor {
|
||||||
StringBuilder message = new StringBuilder(messagePrepend);
|
StringBuilder message = new StringBuilder(messagePrepend);
|
||||||
message.append(BODY_SEPARATOR);
|
message.append(BODY_SEPARATOR);
|
||||||
message.append("\nStandard FlowFile Metadata:");
|
message.append("\nStandard FlowFile Metadata:");
|
||||||
message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getId()));
|
message.append(String.format("\n\t%1$s = '%2$s'", "id", flowFile.getAttribute(CoreAttributes.UUID.key())));
|
||||||
message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
|
message.append(String.format("\n\t%1$s = '%2$s'", "entryDate", new Date(flowFile.getEntryDate())));
|
||||||
message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize()));
|
message.append(String.format("\n\t%1$s = '%2$s'", "fileSize", flowFile.getSize()));
|
||||||
message.append("\nFlowFile Attributes:");
|
message.append("\nFlowFile Attributes:");
|
||||||
|
@ -429,11 +423,43 @@ public class PutEmail extends AbstractProcessor {
|
||||||
return message.toString();
|
return message.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static InternetAddress[] toInetAddresses(final String val) throws AddressException {
|
/**
|
||||||
if (val == null) {
|
* @param context the current context
|
||||||
return new InternetAddress[0];
|
* @param flowFile the current flow file
|
||||||
|
* @param propertyDescriptor the property to evaluate
|
||||||
|
* @return an InternetAddress[] parsed from the supplied property
|
||||||
|
* @throws AddressException if the property cannot be parsed to a valid InternetAddress[]
|
||||||
|
*/
|
||||||
|
private InternetAddress[] toInetAddresses(final ProcessContext context, final FlowFile flowFile,
|
||||||
|
PropertyDescriptor propertyDescriptor) throws AddressException {
|
||||||
|
InternetAddress[] parse;
|
||||||
|
String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
if (value == null || value.isEmpty()){
|
||||||
|
if (propertyDescriptor.isRequired()) {
|
||||||
|
final String exceptionMsg = "Required property '" + propertyDescriptor.getDisplayName() + "' evaluates to an empty string.";
|
||||||
|
throw new AddressException(exceptionMsg);
|
||||||
|
} else {
|
||||||
|
parse = new InternetAddress[0];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
parse = InternetAddress.parse(value);
|
||||||
|
} catch (AddressException e) {
|
||||||
|
final String exceptionMsg = "Unable to parse a valid address for property '" + propertyDescriptor.getDisplayName() + "' with value '"+ value +"'";
|
||||||
|
throw new AddressException(exceptionMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return InternetAddress.parse(val);
|
return parse;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrapper for static method {@link Transport#send(Message)} to add testability of this class.
|
||||||
|
*
|
||||||
|
* @param msg the message to send
|
||||||
|
* @throws MessagingException on error
|
||||||
|
*/
|
||||||
|
protected void send(final Message msg) throws MessagingException {
|
||||||
|
Transport.send(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,28 +16,79 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import javax.mail.Message;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import javax.mail.MessagingException;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import javax.mail.internet.MimeMessage.RecipientType;
|
||||||
|
|
||||||
|
import org.apache.nifi.util.LogMessage;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import static org.junit.Assert.assertEquals;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestPutEmail {
|
public class TestPutEmail {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extension to PutEmail that stubs out the calls to
|
||||||
|
* Transport.sendMessage().
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* All sent messages are records in a list available via the
|
||||||
|
* {@link #getMessages()} method.</p>
|
||||||
|
* <p> Calling
|
||||||
|
* {@link #setException(MessagingException)} will cause the supplied exception to be
|
||||||
|
* thrown when sendMessage is invoked.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
private static final class PutEmailExtension extends PutEmail {
|
||||||
|
private MessagingException e;
|
||||||
|
private ArrayList<Message> messages = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void send(Message msg) throws MessagingException {
|
||||||
|
messages.add(msg);
|
||||||
|
if (this.e != null) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void setException(final MessagingException e) {
|
||||||
|
this.e = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Message> getMessages() {
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PutEmailExtension processor;
|
||||||
|
TestRunner runner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
processor = new PutEmailExtension();
|
||||||
|
runner = TestRunners.newTestRunner(processor);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHostNotFound() {
|
public void testExceptionWhenSending() {
|
||||||
// verifies that files are routed to failure when the SMTP host doesn't exist
|
// verifies that files are routed to failure when Transport.send() throws a MessagingException
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
|
|
||||||
runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
|
runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
|
||||||
runner.setProperty(PutEmail.FROM, "test@apache.org");
|
runner.setProperty(PutEmail.FROM, "test@apache.org");
|
||||||
runner.setProperty(PutEmail.TO, "test@apache.org");
|
runner.setProperty(PutEmail.TO, "test@apache.org");
|
||||||
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
||||||
|
|
||||||
|
processor.setException(new MessagingException("Forced failure from send()"));
|
||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
runner.enqueue("Some Text".getBytes(), attributes);
|
runner.enqueue("Some Text".getBytes(), attributes);
|
||||||
|
|
||||||
|
@ -45,38 +96,112 @@ public class TestPutEmail {
|
||||||
|
|
||||||
runner.assertQueueEmpty();
|
runner.assertQueueEmpty();
|
||||||
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
|
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
|
||||||
|
assertEquals("Expected an attempt to send a single message", 1, processor.getMessages().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmailPropertyFormatters() {
|
public void testOutgoingMessage() throws Exception {
|
||||||
// verifies that files are routed to failure when the SMTP host doesn't exist
|
// verifies that are set on the outgoing Message correctly
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
|
|
||||||
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
|
||||||
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
|
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
|
||||||
runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}");
|
|
||||||
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
||||||
runner.setProperty(PutEmail.FROM, "test@apache.org");
|
runner.setProperty(PutEmail.FROM, "test@apache.org");
|
||||||
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
||||||
runner.setProperty(PutEmail.TO, "recipient@apache.org");
|
runner.setProperty(PutEmail.TO, "recipient@apache.org");
|
||||||
|
|
||||||
ProcessSession session = runner.getProcessSessionFactory().createSession();
|
runner.enqueue("Some Text".getBytes());
|
||||||
FlowFile ff = session.create();
|
|
||||||
ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory");
|
|
||||||
ProcessContext context = runner.getProcessContext();
|
|
||||||
|
|
||||||
String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue();
|
runner.run();
|
||||||
assertEquals("X-Mailer Header", "TestingNiFi", xmailer);
|
|
||||||
|
|
||||||
String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue();
|
runner.assertQueueEmpty();
|
||||||
assertEquals("Socket Factory", "testingSocketFactory", socketFactory);
|
runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS);
|
||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
// Verify that the Message was populated correctly
|
||||||
|
assertEquals("Expected a single message to be sent", 1, processor.getMessages().size());
|
||||||
|
Message message = processor.getMessages().get(0);
|
||||||
|
assertEquals("test@apache.org", message.getFrom()[0].toString());
|
||||||
|
assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]);
|
||||||
|
assertEquals("Message Body", message.getContent());
|
||||||
|
assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
|
||||||
|
assertNull(message.getRecipients(RecipientType.BCC));
|
||||||
|
assertNull(message.getRecipients(RecipientType.CC));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOutgoingMessageWithOptionalProperties() throws Exception {
|
||||||
|
// verifies that optional attributes are set on the outgoing Message correctly
|
||||||
|
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
|
||||||
|
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
||||||
|
runner.setProperty(PutEmail.FROM, "${from}");
|
||||||
|
runner.setProperty(PutEmail.MESSAGE, "${message}");
|
||||||
|
runner.setProperty(PutEmail.TO, "${to}");
|
||||||
|
runner.setProperty(PutEmail.BCC, "${bcc}");
|
||||||
|
runner.setProperty(PutEmail.CC, "${cc}");
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("from", "test@apache.org <NiFi>");
|
||||||
|
attributes.put("message", "the message body");
|
||||||
|
attributes.put("to", "to@apache.org");
|
||||||
|
attributes.put("bcc", "bcc@apache.org");
|
||||||
|
attributes.put("cc", "cc@apache.org");
|
||||||
runner.enqueue("Some Text".getBytes(), attributes);
|
runner.enqueue("Some Text".getBytes(), attributes);
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertQueueEmpty();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS);
|
||||||
|
|
||||||
|
// Verify that the Message was populated correctly
|
||||||
|
assertEquals("Expected a single message to be sent", 1, processor.getMessages().size());
|
||||||
|
Message message = processor.getMessages().get(0);
|
||||||
|
assertEquals("\"test@apache.org\" <NiFi>", message.getFrom()[0].toString());
|
||||||
|
assertEquals("X-Mailer Header", "TestingNiFi", message.getHeader("X-Mailer")[0]);
|
||||||
|
assertEquals("the message body", message.getContent());
|
||||||
|
assertEquals(1, message.getRecipients(RecipientType.TO).length);
|
||||||
|
assertEquals("to@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
|
||||||
|
assertEquals(1, message.getRecipients(RecipientType.BCC).length);
|
||||||
|
assertEquals("bcc@apache.org", message.getRecipients(RecipientType.BCC)[0].toString());
|
||||||
|
assertEquals(1, message.getRecipients(RecipientType.CC).length);
|
||||||
|
assertEquals("cc@apache.org",message.getRecipients(RecipientType.CC)[0].toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidAddress() throws Exception {
|
||||||
|
// verifies that unparsable addresses lead to the flow file being routed to failure
|
||||||
|
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
|
||||||
|
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
||||||
|
runner.setProperty(PutEmail.FROM, "test@apache.org <invalid");
|
||||||
|
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
||||||
|
runner.setProperty(PutEmail.TO, "recipient@apache.org");
|
||||||
|
|
||||||
|
runner.enqueue("Some Text".getBytes());
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
runner.assertQueueEmpty();
|
runner.assertQueueEmpty();
|
||||||
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
|
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
|
||||||
|
|
||||||
|
assertEquals("Expected no messages to be sent", 0, processor.getMessages().size());
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testEmptyFrom() throws Exception {
|
||||||
|
// verifies that if the FROM property evaluates to an empty string at
|
||||||
|
// runtime the flow file is transferred to failure.
|
||||||
|
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
|
||||||
|
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
|
||||||
|
runner.setProperty(PutEmail.FROM, "${MISSING_PROPERTY}");
|
||||||
|
runner.setProperty(PutEmail.MESSAGE, "Message Body");
|
||||||
|
runner.setProperty(PutEmail.TO, "recipient@apache.org");
|
||||||
|
|
||||||
|
runner.enqueue("Some Text".getBytes());
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertQueueEmpty();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
|
||||||
|
|
||||||
|
assertEquals("Expected no messages to be sent", 0, processor.getMessages().size());
|
||||||
|
final LogMessage logMessage = runner.getLogger().getErrorMessages().get(0);
|
||||||
|
assertTrue(((String)logMessage.getArgs()[2]).contains("Required property 'From' evaluates to an empty string"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue