NIFI-1899 - Introduce ExtractEmailAttachments and ExtractEmailHeaders processors - Introduce ListenSMTP (allows NiFi to receive data via email) - Addresses @ijokarumawak and @@JPercivall PR comments

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Andre F de Miranda 2016-06-18 22:23:55 +10:00 committed by jpercivall
parent f352ea10b7
commit 4f672832c0
20 changed files with 2107 additions and 0 deletions

View File

@ -829,6 +829,15 @@ The following binary components are provided under the Apache Software License v
The code for the t-digest was originally authored by Ted Dunning
A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
(ASLv2) subethasmtp
The following NOTICE information applies:
Copyright (C) 2006-2007 SubEthaMail.org
(ASLv2) Apache Commons Email
The following NOTICE information applies:
Apache Commons Email
Copyright 2002-2015 The Apache Software Foundation
This includes derived works from the Apache Software License V2 library python-evtx (https://github.com/williballenthin/python-evtx)
Copyright 2012, 2013 Willi Ballenthin william.ballenthin@mandiant.com

View File

@ -296,6 +296,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-lumberjack-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-nar</artifactId>

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-email-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Date;
import javax.activation.DataSource;
import javax.mail.Address;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.BufferedInputStream;
@SupportsBatching
@EventDriven
@SideEffectFree
@Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Extract attachments from a mime formatted email file, splitting them into individual flowfiles.")
@WritesAttributes({
@WritesAttribute(attribute = "filename ", description = "The filename of the attachment"),
@WritesAttribute(attribute = "email.attachment.parent.filename ", description = "The filename of the parent FlowFile"),
@WritesAttribute(attribute = "email.attachment.parent.uuid", description = "The UUID of the original FlowFile."),
@WritesAttribute(attribute = "mime.type", description = "The mime type of the attachment.")})
public class ExtractEmailAttachments extends AbstractProcessor {
public static final String ATTACHMENT_ORIGINAL_FILENAME = "email.attachment.parent.filename";
public static final String ATTACHMENT_ORIGINAL_UUID = "email.attachment.parent.uuid";
public static final Relationship REL_ATTACHMENTS = new Relationship.Builder()
.name("attachments")
.description("Each individual attachment will be routed to the attachments relationship")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original file")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Flowfiles that could not be parsed")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ATTACHMENTS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
this.descriptors = Collections.unmodifiableList(descriptors);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ComponentLog logger = getLogger();
final FlowFile originalFlowFile = session.get();
if (originalFlowFile == null) {
return;
}
final List<FlowFile> attachmentsList = new ArrayList<>();
final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
final List<FlowFile> originalFlowFilesList = new ArrayList<>();
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
Session mailSession = Session.getDefaultInstance(props, null);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
Date sentDate = originalMessage.getSentDate();
if (from == null || sentDate == null) {
// Throws MessageException due to lack of minimum required headers
throw new MessagingException("Message failed RFC2822 validation");
}
originalFlowFilesList.add(originalFlowFile);
if (parser.hasAttachments()) {
final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
try {
for (final DataSource data : parser.getAttachmentList()) {
FlowFile split = session.create(originalFlowFile);
final Map<String, String> attributes = new HashMap<>();
if (StringUtils.isNotBlank(data.getName())) {
attributes.put(CoreAttributes.FILENAME.key(), data.getName());
}
if (StringUtils.isNotBlank(data.getContentType())) {
attributes.put(CoreAttributes.MIME_TYPE.key(), data.getContentType());
}
String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
split = session.append(split, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
IOUtils.copy(data.getInputStream(), out);
}
});
split = session.putAllAttributes(split, attributes);
attachmentsList.add(split);
}
} catch (FlowFileHandlingException e) {
// Something went wrong
// Removing splits that may have been created
session.remove(attachmentsList);
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e});
invalidFlowFilesList.add(originalFlowFile);
}
}
} catch (Exception e) {
// Another error hit...
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
// Message is invalid or triggered an error during parsing
invalidFlowFilesList.add(originalFlowFile);
}
}
});
session.transfer(attachmentsList, REL_ATTACHMENTS);
// As per above code, originalFlowfile may be routed to invalid or
// original depending on RFC2822 compliance.
session.transfer(invalidFlowFilesList, REL_FAILURE);
session.transfer(originalFlowFilesList, REL_ORIGINAL);
if (attachmentsList.size() > 10) {
logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()});
} else if (attachmentsList.size() > 1){
logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList});
}
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import javax.mail.Address;
import javax.mail.Header;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@SupportsBatching
@EventDriven
@SideEffectFree
@Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Using the flowfile content as source of data, extract header from an RFC compliant email file adding the relevant attributes to the flowfile. " +
"This processor does not perform extensive RFC validation but still requires a bare minimum compliance with RFC 2822")
@WritesAttributes({
@WritesAttribute(attribute = "email.headers.bcc.*", description = "Each individual BCC recipient (if available)"),
@WritesAttribute(attribute = "email.headers.cc.*", description = "Each individual CC recipient (if available)"),
@WritesAttribute(attribute = "email.headers.from.*", description = "Each individual mailbox contained in the From of the Email (array as per RFC-2822)"),
@WritesAttribute(attribute = "email.headers.message-id", description = "The value of the Message-ID header (if available)"),
@WritesAttribute(attribute = "email.headers.received_date", description = "The Received-Date of the message (if available)"),
@WritesAttribute(attribute = "email.headers.sent_date", description = "Date the message was sent"),
@WritesAttribute(attribute = "email.headers.subject", description = "Subject of the message (if available)"),
@WritesAttribute(attribute = "email.headers.to.*", description = "Each individual TO recipient (if available)"),
@WritesAttribute(attribute = "email.attachment_count", description = "Number of attachments of the message" )})
public class ExtractEmailHeaders extends AbstractProcessor {
public static final String EMAIL_HEADER_BCC = "email.headers.bcc";
public static final String EMAIL_HEADER_CC = "email.headers.cc";
public static final String EMAIL_HEADER_FROM = "email.headers.from";
public static final String EMAIL_HEADER_MESSAGE_ID = "email.headers.message-id";
public static final String EMAIL_HEADER_RECV_DATE = "email.headers.received_date";
public static final String EMAIL_HEADER_SENT_DATE = "email.headers.sent_date";
public static final String EMAIL_HEADER_SUBJECT = "email.headers.subject";
public static final String EMAIL_HEADER_TO = "email.headers.to";
public static final String EMAIL_ATTACHMENT_COUNT = "email.attachment_count";
public static final PropertyDescriptor CAPTURED_HEADERS = new PropertyDescriptor.Builder()
.name("CAPTURED_HEADERS")
.displayName("Additional Header List")
.description("COLON separated list of additional headers to be extracted from the flowfile content." +
"NOTE the header key is case insensitive and will be matched as lower-case." +
" Values will respect email contents.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("x-mailer")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Extraction was successful")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Flowfiles that could not be parsed as a RFC-2822 compliant message")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CAPTURED_HEADERS);
this.descriptors = Collections.unmodifiableList(descriptors);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final ComponentLog logger = getLogger();
final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
final List<FlowFile> processedFlowFilesList = new ArrayList<>();
final FlowFile originalFlowFile = session.get();
if (originalFlowFile == null) {
return;
}
final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":"));
final Map<String, String> attributes = new HashMap<>();
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
Session mailSession = Session.getDefaultInstance(props, null);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
Date sentDate = originalMessage.getSentDate();
if (from == null || sentDate == null ) {
// Throws MessageException due to lack of minimum required headers
throw new MessagingException("Message failed RFC2822 validation");
} else if (capturedHeadersList.size() > 0){
Enumeration headers = originalMessage.getAllHeaders();
while (headers.hasMoreElements()) {
Header header = (Header) headers.nextElement();
if (StringUtils.isNotEmpty(header.getValue())
&& capturedHeadersList.contains(header.getName().toLowerCase())) {
attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue());
}
}
}
if (Array.getLength(originalMessage.getAllRecipients()) > 0) {
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.TO)); toCount++) {
attributes.put(EMAIL_HEADER_TO + "." + toCount, originalMessage.getRecipients(Message.RecipientType.TO)[toCount].toString());
}
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.BCC)); toCount++) {
attributes.put(EMAIL_HEADER_BCC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.BCC)[toCount].toString());
}
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getRecipients(Message.RecipientType.CC)); toCount++) {
attributes.put(EMAIL_HEADER_CC + "." + toCount, originalMessage.getRecipients(Message.RecipientType.CC)[toCount].toString());
}
}
// Incredibly enough RFC-2822 specified From as a "mailbox-list" so an array I returned by getFrom
for (int toCount = 0; toCount < ArrayUtils.getLength(originalMessage.getFrom()); toCount++) {
attributes.put(EMAIL_HEADER_FROM + "." + toCount, originalMessage.getFrom()[toCount].toString());
}
if (StringUtils.isNotEmpty(originalMessage.getMessageID())) {
attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID());
}
if (originalMessage.getReceivedDate() != null) {
attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString());
}
if (originalMessage.getSentDate() != null) {
attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString());
}
if (StringUtils.isNotEmpty(originalMessage.getSubject())) {
attributes.put(EMAIL_HEADER_SUBJECT, originalMessage.getSubject());
}
// Zeroes EMAIL_ATTACHMENT_COUNT
attributes.put(EMAIL_ATTACHMENT_COUNT, "0");
// But insert correct value if attachments are present
if (parser.hasAttachments()) {
attributes.put(EMAIL_ATTACHMENT_COUNT, String.valueOf(parser.getAttachmentList().size()));
}
} catch (Exception e) {
// Message is invalid or triggered an error during parsing
attributes.clear();
logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
invalidFlowFilesList.add(originalFlowFile);
}
}
});
if (attributes.size() > 0) {
FlowFile updatedFlowFile = session.putAllAttributes(originalFlowFile, attributes);
logger.info("Extracted {} into {} files", new Object[]{attributes.size(), updatedFlowFile});
processedFlowFilesList.add(updatedFlowFile);
}
session.transfer(processedFlowFilesList, REL_SUCCESS);
session.transfer(invalidFlowFilesList, REL_FAILURE);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
}

View File

@ -0,0 +1,446 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.subethamail.smtp.server.SMTPServer;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode;
import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
@Tags({"listen", "email", "smtp"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " +
"allowing nifi to listen for incoming email. " +
"" +
"Note this server does not perform any email validation. If direct exposure to the internet is sought," +
"it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "The value used during HELO"),
@WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"),
@WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " +
"certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " +
"certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")})
public class ListenSMTP extends AbstractProcessor {
public static final String SMTP_HELO = "smtp.helo";
public static final String SMTP_FROM = "smtp.from";
public static final String SMTP_TO = "smtp.to";
public static final String MIME_TYPE = "message/rfc822";
public static final String SMTP_SRC_IP = "smtp.src";
protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.name("SMTP_PORT")
.displayName("Listening Port")
.description("The TCP port the ListenSMTP processor will bind to." +
"NOTE that on Unix derivative operating systems this port must " +
"be higher than 1024 unless NiFi is running as with root user permissions.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.name("SMTP_HOSTNAME")
.displayName("SMTP hostname")
.description("The hostname to be embedded into the banner displayed when an " +
"SMTP client connects to the processor TCP port .")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_CONNECTIONS")
.displayName("Maximum number of SMTP connection")
.description("The maximum number of simultaneous SMTP connections.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder()
.name("SMTP_TIMEOUT")
.displayName("SMTP connection timeout")
.description("The maximum time to wait for an action of SMTP client.")
.defaultValue("60 seconds")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_MSG_SIZE")
.displayName("SMTP Maximum Message Size")
.description("The maximum number of bytes the server will accept.")
.required(true)
.defaultValue("20MB")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE")
.displayName("SMTP message buffer length")
.description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " +
"Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" +
"queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("1024")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString())
.build();
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results;
}
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Extraction was successful")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages;
private volatile SMTPServer server;
private AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicBoolean stopping = new AtomicBoolean(false);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SMTP_PORT);
props.add(SMTP_HOSTNAME);
props.add(SMTP_MAXIMUM_CONNECTIONS);
props.add(SMTP_TIMEOUT);
props.add(SMTP_MAXIMUM_MSG_SIZE);
props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE);
props.add(SSL_CONTEXT_SERVICE);
props.add(CLIENT_AUTH);
this.propertyDescriptors = Collections.unmodifiableList(props);
}
// Upon Schedule, reset the initialized state to false
@OnScheduled
public void onScheduled(ProcessContext context) {
initialized.set(false);
stopping.set(false);
}
protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception {
// check if we are already running or if it is stopping
if (initialized.get() && server.isRunning() || stopping.get() ) {
return;
}
incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger());
String clientAuth = null;
// If an SSLContextService was provided then create an SSLContext to pass down to the server
SSLContext sslContext = null;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
clientAuth = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
}
final SSLContext finalSslContext = sslContext;
SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger());
final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws IOException {
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
SSLSocketFactory socketFactory = finalSslContext.getSocketFactory();
SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true));
s.setUseClientMode(false);
// For some reason the createSSLContext above is not enough to enforce
// client side auth
// If client auth is required...
if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
s.setNeedClientAuth(true);
}
return s;
}
};
// Set some parameters to our server
server.setSoftwareName("Apache NiFi");
// Set the Server options based on properties
server.setPort(context.getProperty(SMTP_PORT).asInteger());
server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
// Check if TLS should be enabled
if (sslContextService != null) {
server.setEnableTLS(true);
} else {
server.setHideTLS(true);
}
// Set TLS to required in case CLIENT_AUTH = required
if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
server.setRequireTLS(true);
}
this.server = server;
server.start();
getLogger().info("Server started and listening on port " + server.getPort());
initialized.set(true);
stopping.set(false);
}
@OnUnscheduled
public void startShutdown() throws Exception {
if (server != null) {
stopping.set(true);
getLogger().info("Shutting down processor P{}", new Object[]{server});
server.stop();
getLogger().info("Shut down {}", new Object[]{server});
}
}
@OnStopped
public void completeShutdown() throws Exception {
if (server != null) {
if (!server.isRunning() && stopping.get() ) {
stopping.set(false);
}
getLogger().info("Completed shut down {}", new Object[]{server});
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
initializeSMTPServer(context);
} catch (Exception e) {
context.yield();
throw new ProcessException("Failed to initialize the SMTP server", e);
}
while (!incomingMessages.isEmpty()) {
SmtpEvent message = incomingMessages.poll();
if (message == null) {
return;
}
synchronized (message) {
FlowFile flowfile = session.create();
if (message.getMessageData() != null) {
ByteArrayOutputStream messageData = message.getMessageData();
flowfile = session.write(flowfile, new OutputStreamCallback() {
// Write the messageData to flowfile content
@Override
public void process(OutputStream out) throws IOException {
out.write(messageData.toByteArray());
}
});
}
HashMap<String, String> attributes = new HashMap<>();
// Gather message attributes
attributes.put(SMTP_HELO, message.getHelo());
attributes.put(SMTP_SRC_IP, message.getHelo());
attributes.put(SMTP_FROM, message.getFrom());
attributes.put(SMTP_TO, message.getTo());
List<Map<String, String>> details = message.getCertifcateDetails();
int c = 0;
// Add a selection of each X509 certificates to the already gathered attributes
for (Map<String, String> detail : details) {
attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
c++;
}
// Set Mime-Type
attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
// Add the attributes. to flowfile
flowfile = session.putAllAttributes(flowfile, attributes);
session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
session.transfer(flowfile, REL_SUCCESS);
getLogger().info("Transferring {} to success", new Object[]{flowfile});
// Finished processing,
message.setProcessed();
// update the latch so data() can process the rest of the method
message.updateProcessedLatch();
// End of synchronized block
}
// Wait for SMTPMessageHandler data() and done() to complete
// their side of the work (i.e. acknowledgement)
while (!message.getAcknowledged()) {
// Busy wait
}
// Lock one last time
synchronized (message) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
switch (resultCode) {
case UNEXPECTED_ERROR:
case TIMEOUT_ERROR:
session.rollback();
getLogger().warn(resultCode.getLogMessage());
case SUCCESS:
getLogger().info(resultCode.getLogMessage());
break;
default:
getLogger().error(resultCode.getLogMessage());
}
}
}
}
// Same old... same old... used for testing to access the random port that was selected
protected int getPort() {
return server == null ? 0 : server.getPort();
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.event;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A Smtp event which adds the transaction number and command to the StandardEvent.
*/
public class SmtpEvent{
private final String remoteIP;
private final String helo;
private final String from;
private final String to;
private final ByteArrayOutputStream messageData;
private List<Map<String, String>> certificatesDetails;
private AtomicBoolean processed = new AtomicBoolean(false);
private AtomicBoolean acknowledged = new AtomicBoolean(false);
private AtomicInteger returnCode = new AtomicInteger();
private CountDownLatch processedLatch;
public SmtpEvent(
final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
final ByteArrayOutputStream messageData,
CountDownLatch processedLatch) {
this.processedLatch = processedLatch;
this.remoteIP = remoteIP;
this.helo = helo;
this.from = from;
this.to = to;
this.messageData = messageData;
this.certificatesDetails = new ArrayList<>();
for (int c = 0; c < certificates.length; c++) {
X509Certificate cert = certificates[c];
if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) {
Map<String, String> certificate = new HashMap<>();
String certSerialNumber = cert.getSerialNumber().toString();
String certSubjectDN = cert.getSubjectDN().getName();
certificate.put("SerialNumber", certSerialNumber);
certificate.put("SubjectName", certSubjectDN);
certificatesDetails.add(certificate);
}
}
}
public synchronized List<Map<String, String>> getCertifcateDetails() {
return certificatesDetails;
}
public synchronized String getHelo() {
return helo;
}
public synchronized ByteArrayOutputStream getMessageData() {
return messageData;
}
public synchronized String getFrom() {
return from;
}
public synchronized String getTo() {
return to;
}
public synchronized String getRemoteIP() {
return remoteIP;
}
public synchronized void setProcessed() {
this.processed.set(true);
}
public synchronized boolean getProcessed() {
return this.processed.get();
}
public synchronized void setAcknowledged() {
this.acknowledged.set(true);
}
public synchronized boolean getAcknowledged() {
return this.acknowledged.get();
}
public synchronized void updateProcessedLatch() {
this.processedLatch.countDown();
}
public synchronized void setReturnCode(int code) {
this.returnCode.set(code);
}
public synchronized int getReturnCode() {
return this.returnCode.get();
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.handler;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.DropConnectionException;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
final LinkedBlockingQueue<SmtpEvent> incomingMessages;
final ComponentLog logger;
public SMTPMessageHandlerFactory(LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger) {
this.incomingMessages = incomingMessages;
this.logger = logger;
}
@Override
public MessageHandler create(MessageContext messageContext) {
return new Handler(messageContext, incomingMessages, logger);
}
class Handler implements MessageHandler {
final MessageContext messageContext;
String from;
String recipient;
ByteArrayOutputStream messageData;
private CountDownLatch latch;
public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
this.messageContext = messageContext;
this.latch = new CountDownLatch(1);
}
@Override
public void from(String from) throws RejectException {
// TODO: possibly whitelist senders?
this.from = from;
}
@Override
public void recipient(String recipient) throws RejectException {
// TODO: possibly whitelist receivers?
this.recipient = recipient;
}
@Override
public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
// Start counting the timer...
StopWatch watch = new StopWatch(true);
long elapsed;
SMTPServer server = messageContext.getSMTPServer();
final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
this.messageData = new ByteArrayOutputStream();
byte [] buffer = new byte[1024];
int rd;
while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
messageData.write(buffer, 0, rd);
if (messageData.getBufferLength() > server.getMaxMessageSize() ) {
// NOTE: Setting processed at this stage is not desirable as message object will only be created
// if this test (i.e. message size) passes.
final SMTPResultCode returnCode = SMTPResultCode.fromCode(500);
logger.warn(returnCode.getLogMessage());
throw new TooMuchDataException(returnCode.getErrorMessage());
}
}
messageData.flush();
X509Certificate[] certificates = new X509Certificate[]{};
final String remoteIP = messageContext.getRemoteAddress().toString();
final String helo = messageContext.getHelo();
if (messageContext.getTlsPeerCertificates() != null ){
certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
}
SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, messageData, latch);
// / Try to queue the message back to the NiFi session
try {
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
logger.trace(returnCode.getLogMessage());
// NOTE: Setting processed at this stage is redundant as this catch deals with the inability of
// adding message to the processing queue. Yet, for the sake of consistency the message is
// updated nonetheless
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Once message has been sent to the queue, it should be processed by NiFi onTrigger,
// a flowfile created and its processed status updated before an acknowledgment is
// given back to the SMTP client
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
try {
latch.await(serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Latch open unexpectedly. Will return error and requestonTrigger to rollback
logger.trace("Latch opened unexpectedly and processor indicates data wasn't processed. Returned error to SMTP client as precautionary measure");
incomingMessages.remove(message);
// Set the final values so onTrigger can figure out what happened to message
final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
// Inform client
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Remove the message from the queue.
incomingMessages.remove(message);
// Check if message is processed and if yes, check if it was received on time and wraps it up.
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
if (!message.getProcessed() || (elapsed >= serverTimeout)) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
logger.trace("Did not receive the onTrigger response within the acceptable timeframes. Data duplication may have occurred.");
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(250);
message.setAcknowledged();
// Exit, allowing Handler to acknowledge the message
}
@Override
public void done() {
logger.trace("Called the last method of message handler. Exiting");
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.handler;
public enum SMTPResultCode {
// This error isn't raised by code. Being just a default value for the
// fromCode method below
UNKNOWN_ERROR_CODE(0,
"Unknown error.",
"Failed due to unknown error"),
SUCCESS (250,
"Success delivering message",
"Message from {} to {} via {} acknowledgement complete"),
QUEUE_ERROR (421,
"Could not queue the message. Try again",
"The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ),
UNEXPECTED_ERROR(423,
"Unexpected Error. Please try again or contact the administrator in case it persists",
"Error hit during delivery of message from {}"),
TIMEOUT_ERROR (451,
"The processing of your message timed-out, we may have received it but you better off sending it again",
"Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"),
MESSAGE_TOO_LARGE(500,
"Message rejected due to length/size of data",
"Your message exceeds the maximum permitted size");
private static final SMTPResultCode[] codeArray = new SMTPResultCode[501];
static {
for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) {
codeArray[smtpResultCode.getCode()] = smtpResultCode;
}
}
private final int code;
private final String errorMessage;
private final String logMessage;
SMTPResultCode(int code, String errorMessage, String logMessage) {
this.code = code;
this.errorMessage = errorMessage;
this.logMessage = logMessage;
}
public int getCode() {
return code;
}
public String getErrorMessage() {
return errorMessage;
}
public String getLogMessage() {
return logMessage;
}
public static SMTPResultCode fromCode(int code) {
final SMTPResultCode smtpResultCode = codeArray[code];
return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.email.ExtractEmailAttachments
org.apache.nifi.processors.email.ExtractEmailHeaders
org.apache.nifi.processors.email.ListenSMTP

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailAttachment;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.MultiPartEmail;
import org.apache.commons.mail.SimpleEmail;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
public class GenerateAttachment {
String from;
String to;
String subject;
String message;
String hostName;
public GenerateAttachment(String from, String to, String subject, String message, String hostName) {
this.from = from;
this.to = to;
this.subject = subject;
this.message = message;
this.hostName = hostName;
}
public byte[] SimpleEmail() {
Email email = new SimpleEmail();
try {
email.setFrom(from);
email.addTo(to);
email.setSubject(subject);
email.setMsg(message);
email.setHostName(hostName);
email.buildMimeMessage();
} catch (EmailException e) {
e.printStackTrace();
}
ByteArrayOutputStream output = new ByteArrayOutputStream();
MimeMessage mimeMessage = email.getMimeMessage();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
}
return output.toByteArray();
}
public byte[] WithAttachments(int amount) {
MultiPartEmail email = new MultiPartEmail();
try {
email.setFrom(from);
email.addTo(to);
email.setSubject(subject);
email.setMsg(message);
email.setHostName(hostName);
int x = 1;
while (x <= amount) {
// Create an attachment with the pom.xml being used to compile (yay!!!)
EmailAttachment attachment = new EmailAttachment();
attachment.setPath("pom.xml");
attachment.setDisposition(EmailAttachment.ATTACHMENT);
attachment.setDescription("pom.xml");
attachment.setName("pom.xml"+String.valueOf(x));
// attach
email.attach(attachment);
x++;
}
email.buildMimeMessage();
} catch (EmailException e) {
e.printStackTrace();
}
ByteArrayOutputStream output = new ByteArrayOutputStream();
MimeMessage mimeMessage = email.getMimeMessage();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
}
return output.toByteArray();
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class TestExtractEmailAttachments {
// Setups the fields to be used...
String from = "Alice <alice@nifi.apache.org>";
String to = "bob@nifi.apache.org";
String subject = "Just a test email";
String message = "Test test test chocolate";
String hostName = "bermudatriangle";
GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName);
@Test
public void testValidEmailWithAttachments() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
byte [] withAttachment = attachmentGenerator.WithAttachments(1);
runner.enqueue(withAttachment);
runner.run();
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 1);
// Have a look at the attachments...
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
splits.get(0).assertAttributeEquals("filename", "pom.xml1");
}
@Test
public void testValidEmailWithMultipleAttachments() throws Exception {
Random rnd = new Random() ;
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
int amount = rnd.nextInt(10) + 1;
byte [] withAttachment = attachmentGenerator.WithAttachments(amount);
runner.enqueue(withAttachment);
runner.run();
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, amount);
// Have a look at the attachments...
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
List<String> filenames = new ArrayList<>();
for (int a = 0 ; a < amount ; a++ ) {
filenames.add(splits.get(a).getAttribute("filename").toString());
}
Assert.assertTrue(filenames.containsAll(Arrays.asList("pom.xml1", "pom.xml" + amount)));
}
@Test
public void testValidEmailWithoutAttachments() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
byte [] simpleEmail = attachmentGenerator.SimpleEmail();
runner.enqueue(simpleEmail);
runner.run();
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0);
}
@Test
public void testInvalidEmail() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
runner.enqueue("test test test chocolate".getBytes());
runner.run();
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0);
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.util.List;
public class TestExtractEmailHeaders {
// Setup the fields to be used...
String from = "Alice <alice@nifi.apache.org>";
String to = "bob@nifi.apache.org";
String subject = "Just a test email";
String message = "Test test test chocolate";
String hostName = "bermudatriangle";
GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName);
@Test
public void testValidEmailWithAttachments() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
// Create the message dynamically
byte [] withAttachment = attachmentGenerator.WithAttachments(1);
runner.enqueue(withAttachment);
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS);
splits.get(0).assertAttributeEquals("email.headers.from.0", from);
splits.get(0).assertAttributeEquals("email.headers.to.0", to);
splits.get(0).assertAttributeEquals("email.headers.subject", subject);
splits.get(0).assertAttributeEquals("email.attachment_count", "1");
}
@Test
public void testValidEmailWithoutAttachments() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version");
// Create the message dynamically
byte [] simpleEmail = attachmentGenerator.SimpleEmail();
runner.enqueue(simpleEmail);
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS);
splits.get(0).assertAttributeEquals("email.headers.from.0", from);
splits.get(0).assertAttributeEquals("email.headers.to.0", to);
splits.get(0).assertAttributeEquals("email.attachment_count", "0");
splits.get(0).assertAttributeExists("email.headers.mime-version");
}
@Test
public void testInvalidEmail() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.enqueue("test test test chocolate".getBytes());
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 1);
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.SimpleEmail;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.ssl.SSLContextService;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class TestListenSMTP {
@Test(timeout=15000)
public void ValidEmailTls() throws Exception {
boolean[] failed = {false};
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
// Setup the SSL Context
final SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService);
// and add the SSL context to the runner
runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
try {
final Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
try {
System.setProperty("mail.smtp.ssl.trust", "*");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "localtest");
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
// Enable STARTTLS but ignore the cert
email.setStartTLSEnabled(true);
email.setStartTLSRequired(true);
email.setSSLCheckServerIdentity(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (final Throwable t) {
failed[0] = true;
}
}
});
clientThread.start();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
}
// Checks if client experienced Exception
Assert.assertFalse("Client experienced exception", failed[0]);
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1);
clientThread.stop();
Assert.assertFalse("Sending email failed", failed[0]);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
} finally {
// shut down the server
listenSmtp.startShutdown();
}
}
@Test(timeout=15000)
public void ValidEmail() throws Exception, EmailException {
final boolean[] failed = {false};
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
try {
final Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (final EmailException t) {
failed[0] = true;
}
}
});
clientThread.start();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
}
clientThread.stop();
Assert.assertFalse("Sending email failed", failed[0]);
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
} finally {
// shut down the server
listenSmtp.startShutdown();
}
}
@Test(timeout=15000, expected=EmailException.class)
public void ValidEmailTimeOut() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "50 milliseconds");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// force timeout
Thread.sleep(999L);
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
}
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
// shut down the server
listenSmtp.startShutdown();
}
@Test(timeout=15000, expected=EmailException.class)
public void emailTooLarge() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "256B");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "2");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
Thread.sleep(100);
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
runner.assertQueueEmpty();
try {
listenSmtp.startShutdown();
} catch (InterruptedException e) {
e.printStackTrace();
Assert.assertFalse(e.toString(), true);
}
}
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-email-bundle</artifactId>
<packaging>pom</packaging>
<description>NiFi Email Processor Set</description>
<modules>
<module>nifi-email-processors</module>
<module>nifi-email-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-processors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -68,6 +68,7 @@
<module>nifi-snmp-bundle</module>
<module>nifi-windows-event-log-bundle</module>
<module>nifi-ignite-bundle</module>
<module>nifi-email-bundle</module>
</modules>
<dependencyManagement>

View File

@ -1157,6 +1157,12 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-lumberjack-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>