NIFI-3992: Added Original Headers to Include/Exclude

Some headers can cause problems with message parsing, specifically the 'Content-Type' header.
If an email contains attachments, ConsumeEWS may generate emails where the attachments cannot be extracted.

This closes #1867.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
patricker 2017-06-20 11:34:54 +08:00 committed by Koji Kawamura
parent 253ea2e73b
commit 7c27d4c588
1 changed files with 40 additions and 4 deletions

View File

@ -41,6 +41,7 @@ import microsoft.exchange.webservices.data.search.FindItemsResults;
import microsoft.exchange.webservices.data.search.FolderView;
import microsoft.exchange.webservices.data.search.ItemView;
import microsoft.exchange.webservices.data.search.filter.SearchFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.EmailAttachment;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.HtmlEmail;
@ -50,6 +51,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -175,6 +177,23 @@ public class ConsumeEWS extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor INCLUDE_EMAIL_HEADERS = new PropertyDescriptor.Builder()
.name("ews-include-headers")
.displayName("Original Headers to Include")
.description("Comma delimited list specifying which headers from the original message to include in the exported email message. Blank means copy all headers. " +
"Some headers can cause problems with message parsing, specifically the 'Content-Type' header.")
.defaultValue("")
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor EXCLUDE_EMAIL_HEADERS = new PropertyDescriptor.Builder()
.name("ews-exclude-headers")
.displayName("Original Headers to Exclude")
.description("Comma delimited list specifying which headers from the original message to exclude in the exported email message. Blank means don't exclude any headers.")
.defaultValue("")
.addValidator(Validator.VALID)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship")
@ -195,7 +214,6 @@ public class ConsumeEWS extends AbstractProcessor {
protected volatile boolean shouldSetDeleteFlag;
protected volatile String folderName;
protected volatile int fetchSize;
public ConsumeEWS(){
final Set<Relationship> relationshipSet = new HashSet<>();
@ -215,6 +233,8 @@ public class ConsumeEWS extends AbstractProcessor {
descriptors.add(EWS_URL);
descriptors.add(USE_AUTODISCOVER);
descriptors.add(SHOULD_MARK_READ);
descriptors.add(INCLUDE_EMAIL_HEADERS);
descriptors.add(EXCLUDE_EMAIL_HEADERS);
DESCRIPTORS = descriptors;
}
@ -304,6 +324,19 @@ public class ConsumeEWS extends AbstractProcessor {
ExchangeService service = this.initializeIfNecessary(context);
boolean deleteOnRead = context.getProperty(SHOULD_DELETE_MESSAGES).getValue().equals("true");
boolean markAsRead = context.getProperty(SHOULD_MARK_READ).getValue().equals("true");
String includeHeaders = context.getProperty(INCLUDE_EMAIL_HEADERS).getValue();
String excludeHeaders = context.getProperty(EXCLUDE_EMAIL_HEADERS).getValue();
List<String> includeHeadersList = null;
List<String> excludeHeadersList = null;
if (!StringUtils.isEmpty(includeHeaders)) {
includeHeadersList = Arrays.asList(includeHeaders.split(","));
}
if (!StringUtils.isEmpty(excludeHeaders)) {
excludeHeadersList = Arrays.asList(excludeHeaders.split(","));
}
try {
//Get Folder
@ -323,7 +356,7 @@ public class ConsumeEWS extends AbstractProcessor {
for (Item item : findResults) {
EmailMessage ewsMessage = (EmailMessage) item;
messageQueue.add(parseMessage(ewsMessage));
messageQueue.add(parseMessage(ewsMessage,includeHeadersList,excludeHeadersList));
if(deleteOnRead){
ewsMessage.delete(DeleteMode.HardDelete);
@ -367,7 +400,7 @@ public class ConsumeEWS extends AbstractProcessor {
return folder;
}
public MimeMessage parseMessage(EmailMessage item) throws Exception {
public MimeMessage parseMessage(EmailMessage item, List<String> hdrIncludeList, List<String> hdrExcludeList) throws Exception {
EmailMessage ewsMessage = item;
final String bodyText = ewsMessage.getBody().toString();
@ -403,7 +436,10 @@ public class ConsumeEWS extends AbstractProcessor {
//sent date
mm.setSentDate(ewsMessage.getDateTimeSent());
//add message headers
ewsMessage.getInternetMessageHeaders().forEach(x-> mm.addHeader(x.getName(), x.getValue()));
ewsMessage.getInternetMessageHeaders().getItems().stream()
.filter(x -> (hdrIncludeList == null || hdrIncludeList.isEmpty() || hdrIncludeList.contains(x.getName()))
&& (hdrExcludeList == null || hdrExcludeList.isEmpty() || !hdrExcludeList.contains(x.getName())))
.forEach(x-> mm.addHeader(x.getName(), x.getValue()));
//Any attachments
if(ewsMessage.getHasAttachments()){