NIFI-5252 - support arbitrary headers in PutEmail processor

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2787.
This commit is contained in:
Dustin Rodrigues 2018-06-11 22:00:28 -04:00 committed by Pierre Villard
parent 0886dcb0b4
commit 2277b9411e
2 changed files with 38 additions and 0 deletions

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -30,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern;
import javax.activation.DataHandler; import javax.activation.DataHandler;
import javax.mail.Authenticator; import javax.mail.Authenticator;
@ -44,6 +46,7 @@ import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeBodyPart; import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMultipart; import javax.mail.internet.MimeMultipart;
import javax.mail.internet.MimeUtility;
import javax.mail.internet.PreencodedMimeBodyPart; import javax.mail.internet.PreencodedMimeBodyPart;
import javax.mail.util.ByteArrayDataSource; import javax.mail.util.ByteArrayDataSource;
@ -55,6 +58,7 @@ import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -142,6 +146,15 @@ public class PutEmail extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NiFi") .defaultValue("NiFi")
.build(); .build();
public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Email messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type") .name("Content Type")
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html") .description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
@ -256,6 +269,7 @@ public class PutEmail extends AbstractProcessor {
properties.add(SMTP_TLS); properties.add(SMTP_TLS);
properties.add(SMTP_SOCKET_FACTORY); properties.add(SMTP_SOCKET_FACTORY);
properties.add(HEADER_XMAILER); properties.add(HEADER_XMAILER);
properties.add(ATTRIBUTE_NAME_REGEX);
properties.add(CONTENT_TYPE); properties.add(CONTENT_TYPE);
properties.add(FROM); properties.add(FROM);
properties.add(TO); properties.add(TO);
@ -299,6 +313,13 @@ public class PutEmail extends AbstractProcessor {
return errors; return errors;
} }
private volatile Pattern attributeNamePattern = null;
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
this.attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) { public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
@ -319,6 +340,17 @@ public class PutEmail extends AbstractProcessor {
message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC)); message.setRecipients(RecipientType.CC, toInetAddresses(context, flowFile, CC));
message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC)); message.setRecipients(RecipientType.BCC, toInetAddresses(context, flowFile, BCC));
if (attributeNamePattern != null) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (attributeNamePattern.matcher(entry.getKey()).matches()) {
try {
message.setHeader(entry.getKey(), MimeUtility.encodeText(entry.getValue()));
} catch (UnsupportedEncodingException e){
logger.warn("Unable to add header value {} due to encoding exception", new Object[]{entry.getValue()});
}
}
}
}
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue()); message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions(flowFile).getValue());
message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue()); message.setSubject(context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue());

View File

@ -31,6 +31,7 @@ import javax.mail.Message;
import javax.mail.MessagingException; import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage.RecipientType; import javax.mail.internet.MimeMessage.RecipientType;
import javax.mail.internet.MimeMultipart; import javax.mail.internet.MimeMultipart;
import javax.mail.internet.MimeUtility;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.codec.binary.StringUtils;
@ -142,6 +143,7 @@ public class TestPutEmail {
runner.setProperty(PutEmail.TO, "${to}"); runner.setProperty(PutEmail.TO, "${to}");
runner.setProperty(PutEmail.BCC, "${bcc}"); runner.setProperty(PutEmail.BCC, "${bcc}");
runner.setProperty(PutEmail.CC, "${cc}"); runner.setProperty(PutEmail.CC, "${cc}");
runner.setProperty(PutEmail.ATTRIBUTE_NAME_REGEX, "Precedence.*");
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put("from", "test@apache.org <NiFi>"); attributes.put("from", "test@apache.org <NiFi>");
@ -149,6 +151,8 @@ public class TestPutEmail {
attributes.put("to", "to@apache.org"); attributes.put("to", "to@apache.org");
attributes.put("bcc", "bcc@apache.org"); attributes.put("bcc", "bcc@apache.org");
attributes.put("cc", "cc@apache.org"); attributes.put("cc", "cc@apache.org");
attributes.put("Precedence", "bulk");
attributes.put("PrecedenceEncodeDecodeTest", "búlk");
runner.enqueue("Some Text".getBytes(), attributes); runner.enqueue("Some Text".getBytes(), attributes);
runner.run(); runner.run();
@ -168,6 +172,8 @@ public class TestPutEmail {
assertEquals("bcc@apache.org", message.getRecipients(RecipientType.BCC)[0].toString()); assertEquals("bcc@apache.org", message.getRecipients(RecipientType.BCC)[0].toString());
assertEquals(1, message.getRecipients(RecipientType.CC).length); assertEquals(1, message.getRecipients(RecipientType.CC).length);
assertEquals("cc@apache.org",message.getRecipients(RecipientType.CC)[0].toString()); assertEquals("cc@apache.org",message.getRecipients(RecipientType.CC)[0].toString());
assertEquals("bulk", MimeUtility.decodeText(message.getHeader("Precedence")[0]));
assertEquals("búlk", MimeUtility.decodeText(message.getHeader("PrecedenceEncodeDecodeTest")[0]));
} }
@Test @Test