From 4f672832c0efb95e324e8f48c995be6d50a1b67c Mon Sep 17 00:00:00 2001 From: Andre F de Miranda Date: Sat, 18 Jun 2016 22:23:55 +1000 Subject: [PATCH] NIFI-1899 - Introduce ExtractEmailAttachments and ExtractEmailHeaders processors - Introduce ListenSMTP (allows NiFi to receive data via email) - Addresses @ijokarumawak and @@JPercivall PR comments Signed-off-by: jpercivall --- nifi-assembly/NOTICE | 9 + nifi-assembly/pom.xml | 5 + .../nifi-email-bundle/nifi-email-nar/pom.xml | 44 ++ .../nifi-email-processors/pom.xml | 73 +++ .../email/ExtractEmailAttachments.java | 212 +++++++++ .../processors/email/ExtractEmailHeaders.java | 235 +++++++++ .../nifi/processors/email/ListenSMTP.java | 446 ++++++++++++++++++ .../email/smtp/event/SmtpEvent.java | 134 ++++++ .../handler/SMTPMessageHandlerFactory.java | 182 +++++++ .../email/smtp/handler/SMTPResultCode.java | 83 ++++ .../org.apache.nifi.processor.Processor | 17 + .../processors/email/GenerateAttachment.java | 110 +++++ .../email/TestExtractEmailAttachments.java | 113 +++++ .../email/TestExtractEmailHeaders.java | 93 ++++ .../nifi/processors/email/TestListenSMTP.java | 302 ++++++++++++ .../src/test/resources/localhost-ks.jks | Bin 0 -> 3512 bytes .../src/test/resources/localhost-ts.jks | Bin 0 -> 1816 bytes nifi-nar-bundles/nifi-email-bundle/pom.xml | 42 ++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 20 files changed, 2107 insertions(+) create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java create mode 100755 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/resources/localhost-ks.jks create mode 100755 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/resources/localhost-ts.jks create mode 100644 nifi-nar-bundles/nifi-email-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 7ccfc829fc..e6c9f15b94 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -829,6 +829,15 @@ The following binary components are provided under the Apache Software License v The code for the t-digest was originally authored by Ted Dunning A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz) + (ASLv2) subethasmtp + The following NOTICE information applies: + + Copyright (C) 2006-2007 SubEthaMail.org + + (ASLv2) Apache Commons Email + The following NOTICE information applies: + Apache Commons Email + Copyright 2002-2015 The Apache Software Foundation This includes derived works from the Apache Software License V2 library python-evtx (https://github.com/williballenthin/python-evtx) Copyright 2012, 2013 Willi Ballenthin william.ballenthin@mandiant.com diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 1564313444..fb6bf87a8b 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -296,6 +296,11 @@ language governing permissions and limitations under the License. --> nifi-lumberjack-nar nar + + org.apache.nifi + nifi-email-nar + nar + org.apache.nifi nifi-amqp-nar diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml new file mode 100644 index 0000000000..e5fe4fcf04 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-email-bundle + 1.0.0-SNAPSHOT + + + nifi-email-nar + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-email-processors + + + + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml new file mode 100644 index 0000000000..655ac8661d --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -0,0 +1,73 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-email-bundle + 1.0.0-SNAPSHOT + + + nifi-email-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + javax.mail + mail + + + org.apache.commons + commons-email + 1.4 + + + org.subethamail + subethasmtp + 3.1.7 + + + org.apache.nifi + nifi-ssl-context-service-api + + + org.apache.nifi + nifi-ssl-context-service + test + + + org.apache.nifi + nifi-mock + test + + + + + + org.apache.rat + apache-rat-plugin + + + + + + + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java new file mode 100644 index 0000000000..18c74e960e --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailAttachments.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Date; + +import javax.activation.DataSource; +import javax.mail.Address; +import javax.mail.MessagingException; +import javax.mail.Session; +import javax.mail.internet.MimeMessage; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.mail.util.MimeMessageParser; + + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileHandlingException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.stream.io.BufferedInputStream; + + + +@SupportsBatching +@EventDriven +@SideEffectFree +@Tags({"split", "email"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Extract attachments from a mime formatted email file, splitting them into individual flowfiles.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename ", description = "The filename of the attachment"), + @WritesAttribute(attribute = "email.attachment.parent.filename ", description = "The filename of the parent FlowFile"), + @WritesAttribute(attribute = "email.attachment.parent.uuid", description = "The UUID of the original FlowFile."), + @WritesAttribute(attribute = "mime.type", description = "The mime type of the attachment.")}) + +public class ExtractEmailAttachments extends AbstractProcessor { + public static final String ATTACHMENT_ORIGINAL_FILENAME = "email.attachment.parent.filename"; + public static final String ATTACHMENT_ORIGINAL_UUID = "email.attachment.parent.uuid"; + + public static final Relationship REL_ATTACHMENTS = new Relationship.Builder() + .name("attachments") + .description("Each individual attachment will be routed to the attachments relationship") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original file") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Flowfiles that could not be parsed") + .build(); + private Set relationships; + private List descriptors; + + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set relationships = new HashSet<>(); + relationships.add(REL_ATTACHMENTS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List descriptors = new ArrayList<>(); + + this.descriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final ComponentLog logger = getLogger(); + final FlowFile originalFlowFile = session.get(); + if (originalFlowFile == null) { + return; + } + final List attachmentsList = new ArrayList<>(); + final List invalidFlowFilesList = new ArrayList<>(); + final List originalFlowFilesList = new ArrayList<>(); + + session.read(originalFlowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + Properties props = new Properties(); + Session mailSession = Session.getDefaultInstance(props, null); + MimeMessage originalMessage = new MimeMessage(mailSession, in); + MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); + // RFC-2822 determines that a message must have a "From:" header + // if a message lacks the field, it is flagged as invalid + Address[] from = originalMessage.getFrom(); + Date sentDate = originalMessage.getSentDate(); + if (from == null || sentDate == null) { + // Throws MessageException due to lack of minimum required headers + throw new MessagingException("Message failed RFC2822 validation"); + } + originalFlowFilesList.add(originalFlowFile); + if (parser.hasAttachments()) { + final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + try { + for (final DataSource data : parser.getAttachmentList()) { + FlowFile split = session.create(originalFlowFile); + final Map attributes = new HashMap<>(); + if (StringUtils.isNotBlank(data.getName())) { + attributes.put(CoreAttributes.FILENAME.key(), data.getName()); + } + if (StringUtils.isNotBlank(data.getContentType())) { + attributes.put(CoreAttributes.MIME_TYPE.key(), data.getContentType()); + } + String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key()); + attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid); + attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName); + split = session.append(split, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + IOUtils.copy(data.getInputStream(), out); + } + }); + split = session.putAllAttributes(split, attributes); + attachmentsList.add(split); + } + } catch (FlowFileHandlingException e) { + // Something went wrong + // Removing splits that may have been created + session.remove(attachmentsList); + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e}); + invalidFlowFilesList.add(originalFlowFile); + } + } + } catch (Exception e) { + // Another error hit... + // Removing the original flow from its list + originalFlowFilesList.remove(originalFlowFile); + logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e}); + // Message is invalid or triggered an error during parsing + invalidFlowFilesList.add(originalFlowFile); + } + } + }); + + session.transfer(attachmentsList, REL_ATTACHMENTS); + + // As per above code, originalFlowfile may be routed to invalid or + // original depending on RFC2822 compliance. + session.transfer(invalidFlowFilesList, REL_FAILURE); + session.transfer(originalFlowFilesList, REL_ORIGINAL); + + if (attachmentsList.size() > 10) { + logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()}); + } else if (attachmentsList.size() > 1){ + logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList}); + } + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + +} + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java new file mode 100644 index 0000000000..8aa45074c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ExtractEmailHeaders.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.mail.util.MimeMessageParser; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; + +import javax.mail.Address; +import javax.mail.Header; +import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.Session; +import javax.mail.internet.MimeMessage; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +@SupportsBatching +@EventDriven +@SideEffectFree +@Tags({"split", "email"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Using the flowfile content as source of data, extract header from an RFC compliant email file adding the relevant attributes to the flowfile. " + + "This processor does not perform extensive RFC validation but still requires a bare minimum compliance with RFC 2822") +@WritesAttributes({ + @WritesAttribute(attribute = "email.headers.bcc.*", description = "Each individual BCC recipient (if available)"), + @WritesAttribute(attribute = "email.headers.cc.*", description = "Each individual CC recipient (if available)"), + @WritesAttribute(attribute = "email.headers.from.*", description = "Each individual mailbox contained in the From of the Email (array as per RFC-2822)"), + @WritesAttribute(attribute = "email.headers.message-id", description = "The value of the Message-ID header (if available)"), + @WritesAttribute(attribute = "email.headers.received_date", description = "The Received-Date of the message (if available)"), + @WritesAttribute(attribute = "email.headers.sent_date", description = "Date the message was sent"), + @WritesAttribute(attribute = "email.headers.subject", description = "Subject of the message (if available)"), + @WritesAttribute(attribute = "email.headers.to.*", description = "Each individual TO recipient (if available)"), + @WritesAttribute(attribute = "email.attachment_count", description = "Number of attachments of the message" )}) + +public class ExtractEmailHeaders extends AbstractProcessor { + public static final String EMAIL_HEADER_BCC = "email.headers.bcc"; + public static final String EMAIL_HEADER_CC = "email.headers.cc"; + public static final String EMAIL_HEADER_FROM = "email.headers.from"; + public static final String EMAIL_HEADER_MESSAGE_ID = "email.headers.message-id"; + public static final String EMAIL_HEADER_RECV_DATE = "email.headers.received_date"; + public static final String EMAIL_HEADER_SENT_DATE = "email.headers.sent_date"; + public static final String EMAIL_HEADER_SUBJECT = "email.headers.subject"; + public static final String EMAIL_HEADER_TO = "email.headers.to"; + public static final String EMAIL_ATTACHMENT_COUNT = "email.attachment_count"; + + public static final PropertyDescriptor CAPTURED_HEADERS = new PropertyDescriptor.Builder() + .name("CAPTURED_HEADERS") + .displayName("Additional Header List") + .description("COLON separated list of additional headers to be extracted from the flowfile content." + + "NOTE the header key is case insensitive and will be matched as lower-case." + + " Values will respect email contents.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("x-mailer") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Extraction was successful") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Flowfiles that could not be parsed as a RFC-2822 compliant message") + .build(); + + private Set relationships; + private List descriptors; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final List descriptors = new ArrayList<>(); + + descriptors.add(CAPTURED_HEADERS); + this.descriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final ComponentLog logger = getLogger(); + + final List invalidFlowFilesList = new ArrayList<>(); + final List processedFlowFilesList = new ArrayList<>(); + + final FlowFile originalFlowFile = session.get(); + if (originalFlowFile == null) { + return; + } + + final List capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":")); + + final Map attributes = new HashMap<>(); + session.read(originalFlowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + Properties props = new Properties(); + Session mailSession = Session.getDefaultInstance(props, null); + MimeMessage originalMessage = new MimeMessage(mailSession, in); + MimeMessageParser parser = new MimeMessageParser(originalMessage).parse(); + // RFC-2822 determines that a message must have a "From:" header + // if a message lacks the field, it is flagged as invalid + Address[] from = originalMessage.getFrom(); + Date sentDate = originalMessage.getSentDate(); + if (from == null || sentDate == null ) { + // Throws MessageException due to lack of minimum required headers + throw new MessagingException("Message failed RFC2822 validation"); + } else if (capturedHeadersList.size() > 0){ + Enumeration headers = originalMessage.getAllHeaders(); + while (headers.hasMoreElements()) { + Header header = (Header) headers.nextElement(); + if (StringUtils.isNotEmpty(header.getValue()) + && capturedHeadersList.contains(header.getName().toLowerCase())) { + attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue()); + } + } + } + if (Array.getLength(originalMessage.getAllRecipients()) > 0) { + for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.TO)); toCount++) { + attributes.put(EMAIL_HEADER_TO + "." + toCount, originalMessage.getRecipients(Message.RecipientType.TO)[toCount].toString()); + } + for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.BCC)); toCount++) { + attributes.put(EMAIL_HEADER_BCC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.BCC)[toCount].toString()); + } + for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.CC)); toCount++) { + attributes.put(EMAIL_HEADER_CC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.CC)[toCount].toString()); + } + } + // Incredibly enough RFC-2822 specified From as a "mailbox-list" so an array I returned by getFrom + for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getFrom()); toCount++) { + attributes.put(EMAIL_HEADER_FROM + "." + toCount, originalMessage.getFrom()[toCount].toString()); + } + if (StringUtils.isNotEmpty(originalMessage.getMessageID())) { + attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID()); + } + if (originalMessage.getReceivedDate() != null) { + attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString()); + } + if (originalMessage.getSentDate() != null) { + attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString()); + } + if (StringUtils.isNotEmpty(originalMessage.getSubject())) { + attributes.put(EMAIL_HEADER_SUBJECT, originalMessage.getSubject()); + } + // Zeroes EMAIL_ATTACHMENT_COUNT + attributes.put(EMAIL_ATTACHMENT_COUNT, "0"); + // But insert correct value if attachments are present + if (parser.hasAttachments()) { + attributes.put(EMAIL_ATTACHMENT_COUNT, String.valueOf(parser.getAttachmentList().size())); + } + + } catch (Exception e) { + // Message is invalid or triggered an error during parsing + attributes.clear(); + logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e}); + invalidFlowFilesList.add(originalFlowFile); + } + } + }); + + if (attributes.size() > 0) { + FlowFile updatedFlowFile = session.putAllAttributes(originalFlowFile, attributes); + logger.info("Extracted {} into {} files", new Object[]{attributes.size(), updatedFlowFile}); + processedFlowFilesList.add(updatedFlowFile); + } + + session.transfer(processedFlowFilesList, REL_SUCCESS); + session.transfer(invalidFlowFilesList, REL_FAILURE); + + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } +} + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java new file mode 100644 index 0000000000..51b1d2d3fb --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.nifi.processors.email; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.StringUtils; + +import org.subethamail.smtp.server.SMTPServer; + + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.ssl.SSLContextService; + +import org.apache.nifi.processors.email.smtp.event.SmtpEvent; +import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode; +import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory; + +@Tags({"listen", "email", "smtp"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + + "allowing nifi to listen for incoming email. " + + "" + + "Note this server does not perform any email validation. If direct exposure to the internet is sought," + + "it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "The value used during HELO"), + @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), + @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), + @WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"), + @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")}) + +public class ListenSMTP extends AbstractProcessor { + public static final String SMTP_HELO = "smtp.helo"; + public static final String SMTP_FROM = "smtp.from"; + public static final String SMTP_TO = "smtp.to"; + public static final String MIME_TYPE = "message/rfc822"; + public static final String SMTP_SRC_IP = "smtp.src"; + + + protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() + .name("SMTP_PORT") + .displayName("Listening Port") + .description("The TCP port the ListenSMTP processor will bind to." + + "NOTE that on Unix derivative operating systems this port must " + + "be higher than 1024 unless NiFi is running as with root user permissions.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() + .name("SMTP_HOSTNAME") + .displayName("SMTP hostname") + .description("The hostname to be embedded into the banner displayed when an " + + "SMTP client connects to the processor TCP port .") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() + .name("SMTP_MAXIMUM_CONNECTIONS") + .displayName("Maximum number of SMTP connection") + .description("The maximum number of simultaneous SMTP connections.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() + .name("SMTP_TIMEOUT") + .displayName("SMTP connection timeout") + .description("The maximum time to wait for an action of SMTP client.") + .defaultValue("60 seconds") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() + .name("SMTP_MAXIMUM_MSG_SIZE") + .displayName("SMTP Maximum Message Size") + .description("The maximum number of bytes the server will accept.") + .required(true) + .defaultValue("20MB") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder() + .name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE") + .displayName("SMTP message buffer length") + .description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " + + "Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" + + "queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("1024") + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL_CONTEXT_SERVICE") + .displayName("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be received over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("CLIENT_AUTH") + .displayName("Client Auth") + .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) + .build(); + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); + final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (sslContextService != null && StringUtils.isBlank(clientAuth)) { + results.add(new ValidationResult.Builder() + .explanation("Client Auth must be provided when using TLS/SSL") + .valid(false).subject("Client Auth").build()); + } + + return results; + + } + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Extraction was successful") + .build(); + + private Set relationships; + private List propertyDescriptors; + private volatile LinkedBlockingQueue incomingMessages; + + private volatile SMTPServer server; + private AtomicBoolean initialized = new AtomicBoolean(false); + private AtomicBoolean stopping = new AtomicBoolean(false); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List props = new ArrayList<>(); + props.add(SMTP_PORT); + props.add(SMTP_HOSTNAME); + props.add(SMTP_MAXIMUM_CONNECTIONS); + props.add(SMTP_TIMEOUT); + props.add(SMTP_MAXIMUM_MSG_SIZE); + props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); + props.add(SSL_CONTEXT_SERVICE); + props.add(CLIENT_AUTH); + this.propertyDescriptors = Collections.unmodifiableList(props); + + } + + // Upon Schedule, reset the initialized state to false + @OnScheduled + public void onScheduled(ProcessContext context) { + initialized.set(false); + stopping.set(false); + } + + protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { + + // check if we are already running or if it is stopping + if (initialized.get() && server.isRunning() || stopping.get() ) { + return; + } + + incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); + + String clientAuth = null; + + // If an SSLContextService was provided then create an SSLContext to pass down to the server + SSLContext sslContext = null; + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + } + + final SSLContext finalSslContext = sslContext; + + SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); + final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { + + @Override + public SSLSocket createSSLSocket(Socket socket) throws IOException { + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + + SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); + + SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); + + s.setUseClientMode(false); + + + // For some reason the createSSLContext above is not enough to enforce + // client side auth + // If client auth is required... + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { + s.setNeedClientAuth(true); + } + + + return s; + } + }; + + // Set some parameters to our server + server.setSoftwareName("Apache NiFi"); + + + // Set the Server options based on properties + server.setPort(context.getProperty(SMTP_PORT).asInteger()); + server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); + server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); + server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); + server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + + + // Check if TLS should be enabled + if (sslContextService != null) { + server.setEnableTLS(true); + } else { + server.setHideTLS(true); + } + + // Set TLS to required in case CLIENT_AUTH = required + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { + server.setRequireTLS(true); + } + + this.server = server; + server.start(); + + getLogger().info("Server started and listening on port " + server.getPort()); + + initialized.set(true); + stopping.set(false); + } + + @OnUnscheduled + public void startShutdown() throws Exception { + if (server != null) { + stopping.set(true); + getLogger().info("Shutting down processor P{}", new Object[]{server}); + server.stop(); + getLogger().info("Shut down {}", new Object[]{server}); + } + } + + @OnStopped + public void completeShutdown() throws Exception { + if (server != null) { + if (!server.isRunning() && stopping.get() ) { + stopping.set(false); + } + getLogger().info("Completed shut down {}", new Object[]{server}); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + try { + initializeSMTPServer(context); + } catch (Exception e) { + context.yield(); + throw new ProcessException("Failed to initialize the SMTP server", e); + } + + while (!incomingMessages.isEmpty()) { + SmtpEvent message = incomingMessages.poll(); + + + if (message == null) { + return; + } + + synchronized (message) { + + FlowFile flowfile = session.create(); + + if (message.getMessageData() != null) { + ByteArrayOutputStream messageData = message.getMessageData(); + flowfile = session.write(flowfile, new OutputStreamCallback() { + + // Write the messageData to flowfile content + @Override + public void process(OutputStream out) throws IOException { + out.write(messageData.toByteArray()); + } + }); + } + + HashMap attributes = new HashMap<>(); + // Gather message attributes + attributes.put(SMTP_HELO, message.getHelo()); + attributes.put(SMTP_SRC_IP, message.getHelo()); + attributes.put(SMTP_FROM, message.getFrom()); + attributes.put(SMTP_TO, message.getTo()); + + List> details = message.getCertifcateDetails(); + int c = 0; + + // Add a selection of each X509 certificates to the already gathered attributes + + for (Map detail : details) { + attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); + attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); + c++; + } + + // Set Mime-Type + attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE); + + // Add the attributes. to flowfile + flowfile = session.putAllAttributes(flowfile, attributes); + session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/"); + session.transfer(flowfile, REL_SUCCESS); + getLogger().info("Transferring {} to success", new Object[]{flowfile}); + + // Finished processing, + message.setProcessed(); + + // update the latch so data() can process the rest of the method + message.updateProcessedLatch(); + + // End of synchronized block + } + + // Wait for SMTPMessageHandler data() and done() to complete + // their side of the work (i.e. acknowledgement) + while (!message.getAcknowledged()) { + // Busy wait + } + + // Lock one last time + synchronized (message) { + SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); + switch (resultCode) { + case UNEXPECTED_ERROR: + case TIMEOUT_ERROR: + session.rollback(); + getLogger().warn(resultCode.getLogMessage()); + case SUCCESS: + getLogger().info(resultCode.getLogMessage()); + break; + default: + getLogger().error(resultCode.getLogMessage()); + } + } + } + } + + // Same old... same old... used for testing to access the random port that was selected + protected int getPort() { + return server == null ? 0 : server.getPort(); + } + + +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java new file mode 100644 index 0000000000..eaded4a86f --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java @@ -0,0 +1,134 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.nifi.processors.email.smtp.event; + + +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A Smtp event which adds the transaction number and command to the StandardEvent. + */ + +public class SmtpEvent{ + private final String remoteIP; + private final String helo; + private final String from; + private final String to; + private final ByteArrayOutputStream messageData; + private List> certificatesDetails; + private AtomicBoolean processed = new AtomicBoolean(false); + private AtomicBoolean acknowledged = new AtomicBoolean(false); + private AtomicInteger returnCode = new AtomicInteger(); + private CountDownLatch processedLatch; + + public SmtpEvent( + final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates, + final ByteArrayOutputStream messageData, + CountDownLatch processedLatch) { + + this.processedLatch = processedLatch; + + this.remoteIP = remoteIP; + this.helo = helo; + this.from = from; + this.to = to; + this.messageData = messageData; + + this.certificatesDetails = new ArrayList<>(); + + for (int c = 0; c < certificates.length; c++) { + X509Certificate cert = certificates[c]; + if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) { + Map certificate = new HashMap<>(); + + String certSerialNumber = cert.getSerialNumber().toString(); + String certSubjectDN = cert.getSubjectDN().getName(); + + + certificate.put("SerialNumber", certSerialNumber); + certificate.put("SubjectName", certSubjectDN); + + certificatesDetails.add(certificate); + + } + } + } + + public synchronized List> getCertifcateDetails() { + return certificatesDetails; + } + + public synchronized String getHelo() { + return helo; + } + + public synchronized ByteArrayOutputStream getMessageData() { + return messageData; + } + + public synchronized String getFrom() { + return from; + } + + public synchronized String getTo() { + return to; + } + + public synchronized String getRemoteIP() { + return remoteIP; + } + + public synchronized void setProcessed() { + this.processed.set(true); + } + + public synchronized boolean getProcessed() { + return this.processed.get(); + } + + public synchronized void setAcknowledged() { + this.acknowledged.set(true); + } + + public synchronized boolean getAcknowledged() { + return this.acknowledged.get(); + } + + public synchronized void updateProcessedLatch() { + this.processedLatch.countDown(); + } + + public synchronized void setReturnCode(int code) { + this.returnCode.set(code); + } + + public synchronized int getReturnCode() { + return this.returnCode.get(); + } + +} + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java new file mode 100644 index 0000000000..0ac4127c0c --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email.smtp.handler; + +import java.io.IOException; +import java.io.InputStream; +import java.security.cert.X509Certificate; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.StopWatch; +import org.subethamail.smtp.DropConnectionException; +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandler; +import org.subethamail.smtp.MessageHandlerFactory; +import org.subethamail.smtp.RejectException; +import org.subethamail.smtp.TooMuchDataException; +import org.subethamail.smtp.server.SMTPServer; + +import org.apache.nifi.processors.email.smtp.event.SmtpEvent; + +public class SMTPMessageHandlerFactory implements MessageHandlerFactory { + final LinkedBlockingQueue incomingMessages; + final ComponentLog logger; + + + public SMTPMessageHandlerFactory(LinkedBlockingQueue incomingMessages, ComponentLog logger) { + this.incomingMessages = incomingMessages; + this.logger = logger; + + } + + @Override + public MessageHandler create(MessageContext messageContext) { + return new Handler(messageContext, incomingMessages, logger); + } + + class Handler implements MessageHandler { + final MessageContext messageContext; + String from; + String recipient; + ByteArrayOutputStream messageData; + + private CountDownLatch latch; + + public Handler(MessageContext messageContext, LinkedBlockingQueue incomingMessages, ComponentLog logger){ + this.messageContext = messageContext; + this.latch = new CountDownLatch(1); + } + + @Override + public void from(String from) throws RejectException { + // TODO: possibly whitelist senders? + this.from = from; + } + + @Override + public void recipient(String recipient) throws RejectException { + // TODO: possibly whitelist receivers? + this.recipient = recipient; + } + + @Override + public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException { + // Start counting the timer... + StopWatch watch = new StopWatch(true); + + long elapsed; + + SMTPServer server = messageContext.getSMTPServer(); + + final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS); + + this.messageData = new ByteArrayOutputStream(); + + byte [] buffer = new byte[1024]; + + int rd; + + while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { + messageData.write(buffer, 0, rd); + if (messageData.getBufferLength() > server.getMaxMessageSize() ) { + // NOTE: Setting processed at this stage is not desirable as message object will only be created + // if this test (i.e. message size) passes. + final SMTPResultCode returnCode = SMTPResultCode.fromCode(500); + logger.warn(returnCode.getLogMessage()); + throw new TooMuchDataException(returnCode.getErrorMessage()); + } + } + messageData.flush(); + + X509Certificate[] certificates = new X509Certificate[]{}; + + final String remoteIP = messageContext.getRemoteAddress().toString(); + final String helo = messageContext.getHelo(); + + if (messageContext.getTlsPeerCertificates() != null ){ + certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone(); + } + + SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch); + + // / Try to queue the message back to the NiFi session + try { + elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); + incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + final SMTPResultCode returnCode = SMTPResultCode.fromCode(421); + logger.trace(returnCode.getLogMessage()); + + // NOTE: Setting processed at this stage is redundant as this catch deals with the inability of + // adding message to the processing queue. Yet, for the sake of consistency the message is + // updated nonetheless + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + + // Once message has been sent to the queue, it should be processed by NiFi onTrigger, + // a flowfile created and its processed status updated before an acknowledgment is + // given back to the SMTP client + elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); + try { + latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Latch open unexpectedly. Will return error and requestonTrigger to rollback + logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure"); + incomingMessages.remove(message); + + // Set the final values so onTrigger can figure out what happened to message + final SMTPResultCode returnCode = SMTPResultCode.fromCode(423); + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + + // Inform client + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + + // Remove the message from the queue. + incomingMessages.remove(message); + // Check if message is processed and if yes, check if it was received on time and wraps it up. + elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); + if (!message.getProcessed() || (elapsed >= serverTimeout)) { + final SMTPResultCode returnCode = SMTPResultCode.fromCode(451); + logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred."); + + // Set the final values so onTrigger can figure out what happened to message + message.setReturnCode(returnCode.getCode()); + message.setAcknowledged(); + throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); + } + + // Set the final values so onTrigger can figure out what happened to message + message.setReturnCode(250); + message.setAcknowledged(); + // Exit, allowing Handler to acknowledge the message + } + + @Override + public void done() { + logger.trace("Called the last method of message handler. Exiting"); + } + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java new file mode 100644 index 0000000000..5328a0d7ff --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email.smtp.handler; + +public enum SMTPResultCode { + // This error isn't raised by code. Being just a default value for the + // fromCode method below + UNKNOWN_ERROR_CODE(0, + "Unknown error.", + "Failed due to unknown error"), + + SUCCESS (250, + "Success delivering message", + "Message from {} to {} via {} acknowledgement complete"), + + QUEUE_ERROR (421, + "Could not queue the message. Try again", + "The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ), + + UNEXPECTED_ERROR(423, + "Unexpected Error. Please try again or contact the administrator in case it persists", + "Error hit during delivery of message from {}"), + + TIMEOUT_ERROR (451, + "The processing of your message timed-out, we may have received it but you better off sending it again", + "Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"), + + MESSAGE_TOO_LARGE(500, + "Message rejected due to length/size of data", + "Your message exceeds the maximum permitted size"); + + private static final SMTPResultCode[] codeArray = new SMTPResultCode[501]; + + static { + for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) { + codeArray[smtpResultCode.getCode()] = smtpResultCode; + } + } + + private final int code; + private final String errorMessage; + private final String logMessage; + + SMTPResultCode(int code, String errorMessage, String logMessage) { + this.code = code; + this.errorMessage = errorMessage; + this.logMessage = logMessage; + } + + public int getCode() { + return code; + } + + public String getErrorMessage() { + return errorMessage; + } + + public String getLogMessage() { + return logMessage; + } + + public static SMTPResultCode fromCode(int code) { + final SMTPResultCode smtpResultCode = codeArray[code]; + return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode; + } + + +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..7a1f644f4c --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.email.ExtractEmailAttachments +org.apache.nifi.processors.email.ExtractEmailHeaders +org.apache.nifi.processors.email.ListenSMTP diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java new file mode 100644 index 0000000000..ef100b2cc3 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/GenerateAttachment.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email; + +import org.apache.commons.mail.Email; +import org.apache.commons.mail.EmailAttachment; +import org.apache.commons.mail.EmailException; +import org.apache.commons.mail.MultiPartEmail; +import org.apache.commons.mail.SimpleEmail; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; +import java.io.IOException; + +public class GenerateAttachment { + String from; + String to; + String subject; + String message; + String hostName; + + public GenerateAttachment(String from, String to, String subject, String message, String hostName) { + this.from = from; + this.to = to; + this.subject = subject; + this.message = message; + this.hostName = hostName; + } + + public byte[] SimpleEmail() { + Email email = new SimpleEmail(); + try { + email.setFrom(from); + email.addTo(to); + email.setSubject(subject); + email.setMsg(message); + email.setHostName(hostName); + email.buildMimeMessage(); + } catch (EmailException e) { + e.printStackTrace(); + } + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + MimeMessage mimeMessage = email.getMimeMessage(); + try { + mimeMessage.writeTo(output); + } catch (IOException e) { + e.printStackTrace(); + } catch (MessagingException e) { + e.printStackTrace(); + } + + return output.toByteArray(); + } + + public byte[] WithAttachments(int amount) { + MultiPartEmail email = new MultiPartEmail(); + try { + + email.setFrom(from); + email.addTo(to); + email.setSubject(subject); + email.setMsg(message); + email.setHostName(hostName); + + int x = 1; + while (x <= amount) { + // Create an attachment with the pom.xml being used to compile (yay!!!) + EmailAttachment attachment = new EmailAttachment(); + attachment.setPath("pom.xml"); + attachment.setDisposition(EmailAttachment.ATTACHMENT); + attachment.setDescription("pom.xml"); + attachment.setName("pom.xml"+String.valueOf(x)); + // attach + email.attach(attachment); + x++; + } + email.buildMimeMessage(); + } catch (EmailException e) { + e.printStackTrace(); + } + ByteArrayOutputStream output = new ByteArrayOutputStream(); + MimeMessage mimeMessage = email.getMimeMessage(); + try { + mimeMessage.writeTo(output); + } catch (IOException e) { + e.printStackTrace(); + } catch (MessagingException e) { + e.printStackTrace(); + } + + return output.toByteArray(); + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java new file mode 100644 index 0000000000..ee629a6c9d --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailAttachments.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + + +public class TestExtractEmailAttachments { + // Setups the fields to be used... + String from = "Alice "; + String to = "bob@nifi.apache.org"; + String subject = "Just a test email"; + String message = "Test test test chocolate"; + String hostName = "bermudatriangle"; + + GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName); + + + @Test + public void testValidEmailWithAttachments() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); + + // Create the message dynamically + byte [] withAttachment = attachmentGenerator.WithAttachments(1); + + runner.enqueue(withAttachment); + runner.run(); + + runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1); + runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0); + runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 1); + // Have a look at the attachments... + final List splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS); + splits.get(0).assertAttributeEquals("filename", "pom.xml1"); + } + + @Test + public void testValidEmailWithMultipleAttachments() throws Exception { + Random rnd = new Random() ; + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); + + // Create the message dynamically + int amount = rnd.nextInt(10) + 1; + byte [] withAttachment = attachmentGenerator.WithAttachments(amount); + + runner.enqueue(withAttachment); + runner.run(); + + runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1); + runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0); + runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, amount); + // Have a look at the attachments... + final List splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS); + + List filenames = new ArrayList<>(); + for (int a = 0 ; a < amount ; a++ ) { + filenames.add(splits.get(a).getAttribute("filename").toString()); + } + + Assert.assertTrue(filenames.containsAll(Arrays.asList("pom.xml1", "pom.xml" + amount))); + } + + @Test + public void testValidEmailWithoutAttachments() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); + + // Create the message dynamically + byte [] simpleEmail = attachmentGenerator.SimpleEmail(); + + runner.enqueue(simpleEmail); + runner.run(); + + runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1); + runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0); + runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0); + + } + + @Test + public void testInvalidEmail() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments()); + runner.enqueue("test test test chocolate".getBytes()); + runner.run(); + + runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 0); + runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 1); + runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java new file mode 100644 index 0000000000..aed2292c25 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestExtractEmailHeaders.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.email; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.util.List; + +public class TestExtractEmailHeaders { + + // Setup the fields to be used... + String from = "Alice "; + String to = "bob@nifi.apache.org"; + String subject = "Just a test email"; + String message = "Test test test chocolate"; + String hostName = "bermudatriangle"; + + GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName); + + @Test + public void testValidEmailWithAttachments() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); + + // Create the message dynamically + byte [] withAttachment = attachmentGenerator.WithAttachments(1); + + runner.enqueue(withAttachment); + runner.run(); + + runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1); + runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS); + splits.get(0).assertAttributeEquals("email.headers.from.0", from); + splits.get(0).assertAttributeEquals("email.headers.to.0", to); + splits.get(0).assertAttributeEquals("email.headers.subject", subject); + splits.get(0).assertAttributeEquals("email.attachment_count", "1"); + } + + @Test + public void testValidEmailWithoutAttachments() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); + runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version"); + + // Create the message dynamically + byte [] simpleEmail = attachmentGenerator.SimpleEmail(); + + runner.enqueue(simpleEmail); + runner.run(); + + runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1); + runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0); + + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS); + splits.get(0).assertAttributeEquals("email.headers.from.0", from); + splits.get(0).assertAttributeEquals("email.headers.to.0", to); + splits.get(0).assertAttributeEquals("email.attachment_count", "0"); + splits.get(0).assertAttributeExists("email.headers.mime-version"); + } + + + @Test + public void testInvalidEmail() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders()); + runner.enqueue("test test test chocolate".getBytes()); + runner.run(); + + runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0); + runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 1); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java new file mode 100644 index 0000000000..983ac4a8b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.nifi.processors.email; + +import org.apache.commons.mail.Email; +import org.apache.commons.mail.EmailException; +import org.apache.commons.mail.SimpleEmail; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.apache.nifi.ssl.SSLContextService; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class TestListenSMTP { + + @Test(timeout=15000) + public void ValidEmailTls() throws Exception { + boolean[] failed = {false}; + ListenSMTP listenSmtp = new ListenSMTP(); + final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + + runner.setProperty(ListenSMTP.SMTP_PORT, "0"); + runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + + // Setup the SSL Context + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + // and add the SSL context to the runner + runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); + + + + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + + // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog + // where listenSmtp method calls are used to allow the processor to be started using + // port "0" without triggering a violation of PORT_VALIDATOR + + listenSmtp.onScheduled(context); + listenSmtp.initializeSMTPServer(context); + + final int port = listenSmtp.getPort(); + + try { + final Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + try { + + + System.setProperty("mail.smtp.ssl.trust", "*"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "localtest"); + + Email email = new SimpleEmail(); + + email.setHostName("127.0.0.1"); + email.setSmtpPort(port); + + // Enable STARTTLS but ignore the cert + email.setStartTLSEnabled(true); + email.setStartTLSRequired(true); + email.setSSLCheckServerIdentity(false); + + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Test test test chocolate"); + email.addTo("bob@nifi.apache.org"); + + email.send(); + } catch (final Throwable t) { + failed[0] = true; + } + } + }); + clientThread.start(); + + while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { + // process the request. + listenSmtp.onTrigger(context, processSessionFactory); + } + + // Checks if client experienced Exception + Assert.assertFalse("Client experienced exception", failed[0]); + + runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); + clientThread.stop(); + + Assert.assertFalse("Sending email failed", failed[0]); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); + splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); + splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); + + Thread.sleep(100); + } finally { + // shut down the server + listenSmtp.startShutdown(); + } + } + + @Test(timeout=15000) + public void ValidEmail() throws Exception, EmailException { + final boolean[] failed = {false}; + ListenSMTP listenSmtp = new ListenSMTP(); + final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + + runner.setProperty(ListenSMTP.SMTP_PORT, "0"); + runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + + // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog + // where listenSmtp method calls are used to allow the processor to be started using + // port "0" without triggering a violation of PORT_VALIDATOR + listenSmtp.onScheduled(context); + listenSmtp.initializeSMTPServer(context); + + final int port = listenSmtp.getPort(); + + try { + final Thread clientThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Email email = new SimpleEmail(); + email.setHostName("127.0.0.1"); + email.setSmtpPort(port); + email.setStartTLSEnabled(false); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Test test test chocolate"); + email.addTo("bob@nifi.apache.org"); + email.send(); + + } catch (final EmailException t) { + failed[0] = true; + } + } + }); + clientThread.start(); + + while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { + // process the request. + listenSmtp.onTrigger(context, processSessionFactory); + } + clientThread.stop(); + + Assert.assertFalse("Sending email failed", failed[0]); + + runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); + splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); + splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); + + Thread.sleep(100); + } finally { + // shut down the server + listenSmtp.startShutdown(); + } + } + + @Test(timeout=15000, expected=EmailException.class) + public void ValidEmailTimeOut() throws Exception { + + ListenSMTP listenSmtp = new ListenSMTP(); + final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + + runner.setProperty(ListenSMTP.SMTP_PORT, "0"); + runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "50 milliseconds"); + + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + + // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog + // where listenSmtp method calls are used to allow the processor to be started using + // port "0" without triggering a violation of PORT_VALIDATOR + listenSmtp.onScheduled(context); + listenSmtp.initializeSMTPServer(context); + + final int port = listenSmtp.getPort(); + + + Email email = new SimpleEmail(); + email.setHostName("127.0.0.1"); + email.setSmtpPort(port); + email.setStartTLSEnabled(false); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Test test test chocolate"); + email.addTo("bob@nifi.apache.org"); + email.send(); + + while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { + // force timeout + Thread.sleep(999L); + // process the request. + listenSmtp.onTrigger(context, processSessionFactory); + } + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); + splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); + splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); + + Thread.sleep(100); + + // shut down the server + listenSmtp.startShutdown(); + } + + @Test(timeout=15000, expected=EmailException.class) + public void emailTooLarge() throws Exception { + ListenSMTP listenSmtp = new ListenSMTP(); + final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + + runner.setProperty(ListenSMTP.SMTP_PORT, "0"); + runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "256B"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "2"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + + // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog + // where listenSmtp method calls are used to allow the processor to be started using + // port "0" without triggering a violation of PORT_VALIDATOR + listenSmtp.onScheduled(context); + listenSmtp.initializeSMTPServer(context); + + final int port = listenSmtp.getPort(); + + Email email = new SimpleEmail(); + email.setHostName("127.0.0.1"); + email.setSmtpPort(port); + email.setStartTLSEnabled(false); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Test test test chocolate"); + email.addTo("bob@nifi.apache.org"); + email.send(); + + Thread.sleep(100); + + + // process the request. + listenSmtp.onTrigger(context, processSessionFactory); + + runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0); + runner.assertQueueEmpty(); + + try { + listenSmtp.startShutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + Assert.assertFalse(e.toString(), true); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/resources/localhost-ks.jks b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000000000000000000000000000000000000..df36197d92ab8e9870f42666d74c47646fd56f26 GIT binary patch literal 3512 zcmchZXH*l&7RQqiLLfv+Xd3T`p&Iir~-L#Y=#ud?{E1Ldil!ycf^TKF)2%4dD_ z&CRl2juv-w;hX`>r;n!ME0*eJZQo{liYbqFr%&s4KWji3-S|{|Q#z3Bi!_n-HQIvn z)_yhbW3OmoReGZ$;mdsOy)j4ml{e?MpM3BXEZ&%y>@=UK++W7rcU+QDvQxbsYBP>C ziqWW_4}oMI2<5S^ml11u$vzs(Bz1QY%@RE`7dI!_J9pQZTH;ai+~*FZ-!&0FO}AsY zOxhC^^;ctKwcW!%@WtyMsu@6xd3zdv(I!8(v5$IseOUHF#yBeb=(KkBD?D*{)a_{6 zy11;ZtH1s5w8!+ewZvnrkKmE%X*#>Ul%b`b!V6_&L1)$_<6^i6k7Bh$Cbm8X7HN40 zS#G)q)jhM1yqIk|ug4$}yr>lNM^7CDi=S{rQqn53pE8J!Vk=?&Q_pATc&ICwBQ zS(^FTsqy1f=9leGJUj=gReI>!b5N4p{xQ7Yh?)gcpugwPJJKnkHLG#|+$oVkg4yV1aO1A$e7 zaQjo^Q#=uo%^bn4wLVp1-Lpy>m3Om-GmM2@#_FNth9W;Io4*MtEVVL^kgC7SFA-we z#qVjp#>O>$RucpY72eI-)`&+06CPE;lJYi4}@3m`# zJ_AU}qlHP&l8^Sxdy9$-4gOUb4UL4637oYGzAr%oZTy>dW-CT`%o3B(duSJ1(e{$Y zM<9UyvWx;+833RQMN{a4(G-wlHXR5E0)ZV>5?#@72%}__LDViB2!zoC&;$$&%?P2h z0z(iWD~mq^C<3ITh2caaj#n5E%ofhx0nUQPL~nPTGlqqB22Ex{K(u_Eac+1F2b%p@ zfFWRi2!bZ=dhQr@H0!ZShxiYx(fr(S%o#KWt$@YIDPiPok3$Sr4*fIyhqIvoh5uR( z+G9aS0kQzl6d)6b0t5omn(X@$hGj=yE`{&~S2Gtia5Gn?EL_(yG|G+K@=fp0D^(rz zxT1R64#p$fx05POs#deg9+l!c8gwhEor|BbmTA)uRlj-gz6)6_cB&4*Tc-M`bK9>c z*H4msFu-a#7iT^GkUgZvxqIcr(X*;=?XWBEh_4N)!@=`Ah5M!kt4cNNSPATwH?AXC zdENd&XqoAr2Dq}BQ6Gnc3D~XB-xhZWLe^fld)&QlbH&rFP$(?%sxBMiB_=cw?r7CH@9Dd8TnkYHTi)yt>lPMf~Qh{TVz-%zd}mpoX@Lx z7dHOF@cCta&Y}DYj>8M>y0uqvg+{1>9qQK_{DUz^17>%6baZre>Zg9-*JTh{JeEgE(Xc$3KCdGsnB0X~&288Q1yu50`xi`1$u zxw%0F{zoTzg?QpaXg#S%Pc}TD&G9sE#r*FN1sL2ia!PT<-siU_xsUiWo{_zcpd9U!Ni)~G zLi}%abS2t*$1jmQ&rh~)%FTUKeNh{2;~_;7Z1a$&S<~zN0o(9-C8gCXFPUtQaEi(Ok}L|C$~05J}GOTeZ2`>N!9w z|5?&Yv(xUn4w}Md-)+>Xm-idnwqK!l-ep)3M#!opq&#uM)v4O^f$5XSSy^-7P*&lV zi*Bv9WLRzp8QFh_Sp$75|b~$}d%! zADHN!cN?}Zq;Pfp`_&u3UsSsuum4tHmJnSKKJnFdCJT}j<9dY@Y9;CdG*Uh6JugW| zjszU%k%LnRdK;+FkhCS;r3tV3Qu-?q>U@4Gz20FckyBYJ$a2l5D|g6nnw|8he9Zuw zE>xvKu;5sW8RFB^dtl3__u=TrP;92~^c`S>V6o8(>LDq#2#WbkDhztv-Y+KRxxc_( z9-Ig8g=a}sc!GElV)j`DAZZobG^EycOweBae{tMx(CCHt3QRem*{+4B%V0XzUy$!_ zUZ;}$4v!kJ?fiOsh zU6?00F|Q<1!8boIGdazbS85=;kbaqV>qY`p(FtRc*H!<=v7&I|*F*PwV zGR!y-b78_&{p;J_RLYcZ=UKH^oM-d2R~63QK8sqv6wbQ1c%Aj-tT=16Xl@Dp3*V;e zHf*;EU2s!d?EmGAwL4$*KMm76>RxSI^Y_{r+12XOyBVZ5SkF88wdmZUBCW>1mjpsy z^o8A>D^$57@$5Uk|7*7VJjNZDDg1En^sD7BzpeZg;PKvK$44Vgqc3^M$IC50#>}kV z5b(o}W%EH!_vB=5`RI47^%}8dvO6oHmz@0=8J8WnQn7ZTq?gtGwUN*b>^*j51vd`gXZ}Tj=d=! zZ5Q2p8)B?EgtP6!|DK($dm-WAwXXk9U+SN8m>b$H+55Tn^-f3Qi)|}kFy(38X`WLz zb3tscaO}@TH^6nkgPpdLY>Z2bWWLj})^PvwNNvp0VmYkR- zC$rcPs*X#TBPg{vHL)l;!%)D052TJ;m^~!5xFj<#9cRWgF)#(@MiUbYLkm#GG%*0? zP$-w~?rCD&0nCOvupnUsa^#sB8yWuA2RF)=3TX!2_nM>k=E?JKg4=^^-n%dyma}iz z7O0l#8tb4G_&d_JH{#d+qhEI!d^66fYKrV z-7ByE+kQ;?hjsY#V=I=4^0WMI{&xAO++ilu5aB4XiALW_Kd;kHysq{BlM=J!+>0KJ z$C*SKrY8jSiz;)U*)(Zq)1ucc+#e!jzJi?g{o#VvYqM?do!+xL#%xFU&dMq4cmJ|_ z)$}vmhufCDDLpVUyl>Z)NdISr>;jDqTRg=Im0$R1hzV~$&uP?iV%b9*v8wKnnqG|u zi`U6%Z(de9F>i4__b)}$q>sOosu)$Q&n)@4Z$;pQ&K1q~A4WZ$&o-$$Ev}(DR5gXs z$NJxSPc7!gRtAte7ABjPohmimJL!w=83*1S57xEkb0m5`p0y|T%0y91?Xr*$k!KcN z@qQxIFmK}r4~|)iTkLXzMLu+2k#TdI871R( + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.0.0-SNAPSHOT + + + nifi-email-bundle + pom + NiFi Email Processor Set + + + nifi-email-processors + nifi-email-nar + + + + + + org.apache.nifi + nifi-email-processors + 1.0.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 2a38a30d53..63ed50e33e 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -68,6 +68,7 @@ nifi-snmp-bundle nifi-windows-event-log-bundle nifi-ignite-bundle + nifi-email-bundle diff --git a/pom.xml b/pom.xml index 368c4c8d7e..b852c8f379 100644 --- a/pom.xml +++ b/pom.xml @@ -1157,6 +1157,12 @@ language governing permissions and limitations under the License. --> nifi-lumberjack-nar 1.0.0-SNAPSHOT nar + + + org.apache.nifi + nifi-email-nar + 1.0.0-SNAPSHOT + nar org.apache.nifi