Adding TLS to PutEmail and several other PropertyDescriptors that can be driven by EL for full control of outgoing emails. Also switched from Sun SMTPTransport to Java Transport for sending the email message

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Brian Ghigiarelli 2015-03-20 16:37:23 -04:00 committed by Mark Payne
parent e48ea72c30
commit b97396c0fa
2 changed files with 167 additions and 22 deletions

View File

@ -22,18 +22,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import javax.activation.DataHandler;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.URLName;
import javax.mail.Transport;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart;
@ -61,8 +65,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.sun.mail.smtp.SMTPTransport;
@SupportsBatching
@Tags({"email", "put", "notify", "smtp"})
@CapabilityDescription("Sends an e-mail to configured recipients for each incoming FlowFile")
@ -72,6 +74,7 @@ public class PutEmail extends AbstractProcessor {
.name("SMTP Hostname")
.description("The hostname of the SMTP host")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
@ -79,21 +82,56 @@ public class PutEmail extends AbstractProcessor {
.description("The Port used for SMTP communications")
.required(true)
.defaultValue("25")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
.name("SMTP Username")
.description("Username for the SMTP account")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
.name("SMTP Password")
.description("Password for the SMTP account")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
.name("SMTP Auth")
.description("Flag indicating whether authentication should be used")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("true")
.build();
public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder()
.name("SMTP TLS")
.description("Flag indicating whether TLS should be enabled")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder()
.name("SMTP Socket Factory")
.description("Socket Factory to use for SMTP Connection")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("javax.net.ssl.SSLSocketFactory")
.build();
public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
.name("SMTP X-Mailer Header")
.description("X-Mailer used in the header of the outgoing email")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NiFi")
.build();
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.name("From")
.description("Specifies the Email address to use as the sender")
@ -152,12 +190,27 @@ public class PutEmail extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully sent will be routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles that fail to send will be routed to this relationship").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
/**
* Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
*/
private static Map<String, PropertyDescriptor> propertyToContext = new HashMap<String, PropertyDescriptor>();
static {
propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME);
propertyToContext.put("mail.smtp.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY);
propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS);
propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
}
@Override
protected void init(final ProcessorInitializationContext context) {
@ -166,6 +219,10 @@ public class PutEmail extends AbstractProcessor {
properties.add(SMTP_PORT);
properties.add(SMTP_USERNAME);
properties.add(SMTP_PASSWORD);
properties.add(SMTP_AUTH);
properties.add(SMTP_TLS);
properties.add(SMTP_SOCKET_FACTORY);
properties.add(HEADER_XMAILER);
properties.add(FROM);
properties.add(TO);
properties.add(CC);
@ -214,9 +271,10 @@ public class PutEmail extends AbstractProcessor {
return;
}
final Properties properties = new Properties();
properties.setProperty("smtp.mail.host", context.getProperty(SMTP_HOSTNAME).getValue());
final Session mailSession = Session.getInstance(properties);
final Properties properties = this.getMailPropertiesFromFlowFile(context, flowFile);
final Session mailSession = this.createMailSession(properties);
final Message message = new MimeMessage(mailSession);
final ProcessorLog logger = getLogger();
@ -232,7 +290,7 @@ public class PutEmail extends AbstractProcessor {
final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions(flowFile).getValue());
message.setRecipients(RecipientType.BCC, bccAddresses);
message.setHeader("X-Mailer", "NiFi");
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue());
message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue());
String messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
@ -264,18 +322,8 @@ public class PutEmail extends AbstractProcessor {
multipart.addBodyPart(mimeFile);
message.setContent(multipart);
}
final String smtpHost = context.getProperty(SMTP_HOSTNAME).getValue();
final SMTPTransport transport = new SMTPTransport(mailSession, new URLName(smtpHost));
try {
final int smtpPort = context.getProperty(SMTP_PORT).asInteger();
final String smtpUsername = context.getProperty(SMTP_USERNAME).getValue();
final String smtpPassword = context.getProperty(SMTP_PASSWORD).getValue();
transport.connect(smtpHost, smtpPort, smtpUsername, smtpPassword);
transport.sendMessage(message, message.getAllRecipients());
} finally {
transport.close();
}
Transport.send(message);
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
@ -287,7 +335,66 @@ public class PutEmail extends AbstractProcessor {
}
}
public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n";
/**
* Based on the input properties, determine whether an authenticate or unauthenticated session
* should be used. If authenticated, creates a Password Authenticator for use in sending the email.
*
* @param properties
* @return
*/
private Session createMailSession(final Properties properties) {
String authValue = properties.getProperty("mail.smtp.auth");
Boolean auth = Boolean.valueOf(authValue);
/*
* Conditionally create a password authenticator if the 'auth' parameter is set.
*/
final Session mailSession = auth ? Session.getInstance(properties, new Authenticator() {
@Override
public PasswordAuthentication getPasswordAuthentication() {
String username = properties.getProperty("mail.smtp.user"),
password = properties.getProperty("mail.smtp.password");
return new PasswordAuthentication(username, password);
}
}) : Session.getInstance(properties); // without auth
return mailSession;
}
/**
* Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to build
* the required Properties object to be used for sending this email
*
* @param context
* @param flowFile
* @return
*/
private Properties getMailPropertiesFromFlowFile(final ProcessContext context, final FlowFile flowFile) {
final Properties properties = new Properties();
final ProcessorLog logger = this.getLogger();
for(Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
// Evaluate the property descriptor against the flow file
String flowFileValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions(flowFile).getValue();
String property = entry.getKey();
logger.debug("Evaluated Mail Property: {} with Value: {}", new Object[]{property, flowFileValue});
// Nullable values are not allowed, so filter out
if(null != flowFileValue) {
properties.setProperty(property, flowFileValue);
}
}
return properties;
}
public static final String BODY_SEPARATOR = "\n\n--------------------------------------------------\n";
private static String formatAttributes(final FlowFile flowFile, final String messagePrepend) {
StringBuilder message = new StringBuilder(messagePrepend);

View File

@ -19,14 +19,19 @@ package org.apache.nifi.processors.standard;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestPutEmail {
@Test
public void testHotNotFound() {
public void testHostNotFound() {
// verifies that files are routed to failure when the SMTP host doesn't exist
final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
runner.setProperty(PutEmail.SMTP_HOSTNAME, "host-doesnt-exist123");
@ -42,4 +47,37 @@ public class TestPutEmail {
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
}
@Test
public void testEmailPropertyFormatters() {
// verifies that files are routed to failure when the SMTP host doesn't exist
final TestRunner runner = TestRunners.newTestRunner(new PutEmail());
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.SMTP_SOCKET_FACTORY, "${dynamicSocketFactory}");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
ProcessSession session = runner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, "dynamicSocketFactory", "testingSocketFactory");
ProcessContext context = runner.getProcessContext();
String xmailer = context.getProperty(PutEmail.HEADER_XMAILER).evaluateAttributeExpressions(ff).getValue();
assertEquals("X-Mailer Header", "TestingNiFi", xmailer);
String socketFactory = context.getProperty(PutEmail.SMTP_SOCKET_FACTORY).evaluateAttributeExpressions(ff).getValue();
assertEquals("Socket Factory", "testingSocketFactory", socketFactory);
final Map<String, String> attributes = new HashMap<>();
runner.enqueue("Some Text".getBytes(), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_FAILURE);
}
}