NIFI-4326 Fix NullPointerException and strict addressing

This uses parseHeader() instead of getFrom() and getRecipients() in order to avoid strict addressing.
It also checks for null to solve a null pointer exception.
By contract, this processor should grab information "if available". Which means it should not fail if the info is unavailable.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2111.
This commit is contained in:
btwood 2017-08-25 16:37:50 -04:00 committed by Pierre Villard
parent 2ee21b9255
commit 90ed08ec33
4 changed files with 181 additions and 37 deletions

View File

@ -121,21 +121,27 @@ public class ExtractEmailAttachments extends AbstractProcessor {
final List<FlowFile> invalidFlowFilesList = new ArrayList<>(); final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
final List<FlowFile> originalFlowFilesList = new ArrayList<>(); final List<FlowFile> originalFlowFilesList = new ArrayList<>();
final String requireStrictAddresses = "false";
session.read(originalFlowFile, new InputStreamCallback() { session.read(originalFlowFile, new InputStreamCallback() {
@Override @Override
public void process(final InputStream rawIn) throws IOException { public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties(); Properties props = new Properties();
Session mailSession = Session.getDefaultInstance(props, null); props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in); MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header // RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid // if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom(); Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
Date sentDate = originalMessage.getSentDate(); Date sentDate = originalMessage.getSentDate();
if (from == null || sentDate == null) { if (sentDate == null) {
// Throws MessageException due to lack of minimum required headers // Throws MessageException due to lack of minimum required headers
throw new MessagingException("Message failed RFC2822 validation"); throw new MessagingException("Message failed RFC2822 validation: No Sent Date");
} }
originalFlowFilesList.add(originalFlowFile); originalFlowFilesList.add(originalFlowFile);
if (parser.hasAttachments()) { if (parser.hasAttachments()) {
@ -209,4 +215,3 @@ public class ExtractEmailAttachments extends AbstractProcessor {
} }

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -49,7 +50,6 @@ import javax.mail.Session;
import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -67,7 +67,7 @@ import java.util.Set;
@SideEffectFree @SideEffectFree
@Tags({"split", "email"}) @Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Using the flowfile content as source of data, extract header from an RFC compliant email file adding the relevant attributes to the flowfile. " + @CapabilityDescription("Using the flowfile content as source of data, extract header from an RFC compliant email file adding the relevant attributes to the flowfile. " +
"This processor does not perform extensive RFC validation but still requires a bare minimum compliance with RFC 2822") "This processor does not perform extensive RFC validation but still requires a bare minimum compliance with RFC 2822")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "email.headers.bcc.*", description = "Each individual BCC recipient (if available)"), @WritesAttribute(attribute = "email.headers.bcc.*", description = "Each individual BCC recipient (if available)"),
@ -103,6 +103,24 @@ public class ExtractEmailHeaders extends AbstractProcessor {
.defaultValue("x-mailer") .defaultValue("x-mailer")
.build(); .build();
private static final AllowableValue STRICT_ADDRESSING = new AllowableValue("true", "Strict Address Parsing",
"Strict email address format will be enforced. FlowFiles will be transfered to the failure relationship if the email address is invalid.");
private static final AllowableValue NONSTRICT_ADDRESSING = new AllowableValue("false", "Non-Strict Address Parsing",
"Accept emails, even if the address is poorly formed and doesn't strictly comply with RFC Validation.");
public static final PropertyDescriptor STRICT_PARSING = new PropertyDescriptor.Builder()
.name("STRICT_ADDRESS_PARSING")
.displayName("Email Address Parsing")
.description("If \"strict\", strict address format parsing rules are applied to mailbox and mailbox list fields, " +
"such as \"to\" and \"from\" headers, and FlowFiles with poorly formed addresses will be routed " +
"to the failure relationship, similar to messages that fail RFC compliant format validation. " +
"If \"non-strict\", the processor will extract the contents of mailbox list headers as comma-separated " +
"values without attempting to parse each value as well-formed Internet mailbox addresses. " +
"This is optional and defaults to " + STRICT_ADDRESSING.getDisplayName())
.required(false)
.defaultValue(STRICT_ADDRESSING.getValue())
.allowableValues(STRICT_ADDRESSING, NONSTRICT_ADDRESSING)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Extraction was successful") .description("Extraction was successful")
@ -125,6 +143,7 @@ public class ExtractEmailHeaders extends AbstractProcessor {
final List<PropertyDescriptor> descriptors = new ArrayList<>(); final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CAPTURED_HEADERS); descriptors.add(CAPTURED_HEADERS);
descriptors.add(STRICT_PARSING);
this.descriptors = Collections.unmodifiableList(descriptors); this.descriptors = Collections.unmodifiableList(descriptors);
} }
@ -140,6 +159,7 @@ public class ExtractEmailHeaders extends AbstractProcessor {
return; return;
} }
final String requireStrictAddresses = context.getProperty(STRICT_PARSING).getValue();
final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":")); final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":"));
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
@ -148,16 +168,20 @@ public class ExtractEmailHeaders extends AbstractProcessor {
public void process(final InputStream rawIn) throws IOException { public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties(); Properties props = new Properties();
Session mailSession = Session.getDefaultInstance(props, null); props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in); MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header // RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid // if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom(); Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
Date sentDate = originalMessage.getSentDate(); Date sentDate = originalMessage.getSentDate();
if (from == null || sentDate == null ) { if (sentDate == null ) {
// Throws MessageException due to lack of minimum required headers // Throws MessageException due to lack of minimum required headers
throw new MessagingException("Message failed RFC2822 validation"); throw new MessagingException("Message failed RFC-2822 validation: No Sent Date");
} else if (capturedHeadersList.size() > 0){ } else if (capturedHeadersList.size() > 0){
Enumeration headers = originalMessage.getAllHeaders(); Enumeration headers = originalMessage.getAllHeaders();
while (headers.hasMoreElements()) { while (headers.hasMoreElements()) {
@ -168,21 +192,12 @@ public class ExtractEmailHeaders extends AbstractProcessor {
} }
} }
} }
if (Array.getLength(originalMessage.getAllRecipients()) > 0) {
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.TO)); toCount++) { putAddressListInAttributes(attributes, EMAIL_HEADER_TO, originalMessage.getRecipients(Message.RecipientType.TO));
attributes.put(EMAIL_HEADER_TO + "." + toCount, originalMessage.getRecipients(Message.RecipientType.TO)[toCount].toString()); putAddressListInAttributes(attributes, EMAIL_HEADER_CC, originalMessage.getRecipients(Message.RecipientType.CC));
} putAddressListInAttributes(attributes, EMAIL_HEADER_BCC, originalMessage.getRecipients(Message.RecipientType.BCC));
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.BCC)); toCount++) { putAddressListInAttributes(attributes, EMAIL_HEADER_FROM, originalMessage.getFrom()); // RFC-2822 specifies "From" as mailbox-list
attributes.put(EMAIL_HEADER_BCC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.BCC)[toCount].toString());
}
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.CC)); toCount++) {
attributes.put(EMAIL_HEADER_CC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.CC)[toCount].toString());
}
}
// Incredibly enough RFC-2822 specified From as a "mailbox-list" so an array I returned by getFrom
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getFrom()); toCount++) {
attributes.put(EMAIL_HEADER_FROM + "." + toCount, originalMessage.getFrom()[toCount].toString());
}
if (StringUtils.isNotEmpty(originalMessage.getMessageID())) { if (StringUtils.isNotEmpty(originalMessage.getMessageID())) {
attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID()); attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID());
} }
@ -231,5 +246,15 @@ public class ExtractEmailHeaders extends AbstractProcessor {
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; return descriptors;
} }
}
private static void putAddressListInAttributes(
Map<String, String> attributes,
final String attributePrefix,
Address[] addresses) {
if (addresses != null) {
for (int count = 0; count < ArrayUtils.getLength(addresses); count++) {
attributes.put(attributePrefix + "." + count, addresses[count].toString());
}
}
}
}

View File

@ -44,6 +44,20 @@ public class GenerateAttachment {
} }
public byte[] SimpleEmail() { public byte[] SimpleEmail() {
MimeMessage mimeMessage = SimpleEmailMimeMessage();
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
}
return output.toByteArray();
}
public MimeMessage SimpleEmailMimeMessage() {
Email email = new SimpleEmail(); Email email = new SimpleEmail();
try { try {
email.setFrom(from); email.setFrom(from);
@ -56,19 +70,10 @@ public class GenerateAttachment {
e.printStackTrace(); e.printStackTrace();
} }
ByteArrayOutputStream output = new ByteArrayOutputStream(); return email.getMimeMessage();
MimeMessage mimeMessage = email.getMimeMessage();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
}
return output.toByteArray();
} }
public byte[] WithAttachments(int amount) { public byte[] WithAttachments(int amount) {
MultiPartEmail email = new MultiPartEmail(); MultiPartEmail email = new MultiPartEmail();
try { try {

View File

@ -17,11 +17,15 @@
package org.apache.nifi.processors.email; package org.apache.nifi.processors.email;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.List; import java.util.List;
public class TestExtractEmailHeaders { public class TestExtractEmailHeaders {
@ -79,6 +83,111 @@ public class TestExtractEmailHeaders {
splits.get(0).assertAttributeExists("email.headers.mime-version"); splits.get(0).assertAttributeExists("email.headers.mime-version");
} }
/**
* Test case added for NIFI-4326 for a potential NPE bug
* if the email message contains no recipient header fields, ie,
* TO, CC, BCC.
*/
@Test
public void testValidEmailWithNoRecipients() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
simpleEmailMimeMessage.removeHeader("To");
simpleEmailMimeMessage.removeHeader("Cc");
simpleEmailMimeMessage.removeHeader("Bcc");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS);
splits.get(0).assertAttributeEquals("email.headers.from.0", from);
splits.get(0).assertAttributeExists("email.headers.mime-version");
splits.get(0).assertAttributeNotExists("email.headers.to");
splits.get(0).assertAttributeNotExists("email.headers.cc");
splits.get(0).assertAttributeNotExists("email.headers.bcc");
}
/**
* NIFI-4326 adds a new feature to disable strict address parsing for
* mailbox list header fields. This is a test case that asserts that
* lax address parsing passes (when set to "strict=false") for malformed
* addresses.
*/
@Test
public void testNonStrictParsingPassesForInvalidAddresses() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "false");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
simpleEmailMimeMessage.setHeader("From", "<bad_email>");
simpleEmailMimeMessage.setHeader("To", "<>, Joe, \"\" <>");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS);
splits.get(0).assertAttributeEquals("email.headers.from.0", "bad_email");
splits.get(0).assertAttributeEquals("email.headers.to.0", "");
splits.get(0).assertAttributeEquals("email.headers.to.1", "Joe");
splits.get(0).assertAttributeEquals("email.headers.to.2", "");
}
/**
* NIFI-4326 adds a new feature to disable strict address parsing for
* mailbox list header fields. This is a test case that asserts that
* strict address parsing fails (when set to "strict=true") for malformed
* addresses.
*/
@Test
public void testStrictParsingFailsForInvalidAddresses() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "true");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
simpleEmailMimeMessage.setHeader("From", "<bad_email>");
simpleEmailMimeMessage.setHeader("To", "<>, Joe, <invalid>");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 1);
}
@Test @Test
public void testInvalidEmail() throws Exception { public void testInvalidEmail() throws Exception {