NIFI-2519 Fixed and refactored ListenSMTP processor

- Removed message queueing which could result in data loss
- Fixed life-cycle issues that coudl put processor in an unstable state
- Fixed PropertyDescriptor translation for Time units and Byte sizes
- Fixed broken tests
- Added additional tests

NIFI-2519 added default for SMTP_MAXIMUM_CONNECTIONS

NIFI-2519 addressed PR comments, polishing
- fixed intermittent deadlock on processor stop and added test for it
- the attributes that can not be extracted from the message but available via MessageContext are written into the outgoing FlowFile
- other minor fixes

NIFI-2519 addressed lates PR comments

NIFI-2519 added better messaging when server closes the connection

NIFI-2519 some polishing and additional tests to validate deadlocks

NIFI-2519 address latest PR comments
fixed deadlock condition for when the consumer is stopped while server is distributing messages
fixed MAX message size issue ensuring it is validated
set max connections to SMTPServer
polished pom
added L&N

NIFI-2519 PR comments
- fixed LICENSE
- Added usage of LimitingInputStream
- simplified SmtpConsumer by removing hasMessage operation
This commit is contained in:
Oleg Zhurakousky 2016-08-10 10:19:17 -04:00 committed by joewitt
parent 0855cb9bd4
commit 48fa76ecff
13 changed files with 1258 additions and 973 deletions

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket;
import java.io.IOException;
import java.net.ServerSocket;
/**
*
*/
public class NetworkUtils {
/**
* Will determine the available port
*/
public final static int availablePort() {
ServerSocket s = null;
try {
s = new ServerSocket(0);
s.setReuseAddress(true);
return s.getLocalPort();
} catch (Exception e) {
throw new IllegalStateException("Failed to discover available port.", e);
} finally {
try {
s.close();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.stream.io; package org.apache.nifi.stream.io;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -24,16 +25,50 @@ public class LimitingInputStream extends InputStream {
private final InputStream in; private final InputStream in;
private final long limit; private final long limit;
private long bytesRead = 0; private long bytesRead = 0;
private final boolean exceptionOnLimit;
/**
* Constructs a limited input stream whereby if the limit is reached all
* subsequent calls to read will return a -1.
*
* @param in
* the underlying input stream
* @param limit
* maximum length of bytes to read from underlying input stream
*/
public LimitingInputStream(final InputStream in, final long limit) { public LimitingInputStream(final InputStream in, final long limit) {
this(in, limit, false);
}
/**
* Constructs a limited input stream whereby if the limit is reached all
* subsequent calls to read will return a -1 or EOFexception as configured
*
* @param in
* the underlying input stream
* @param limit
* maximum length of bytes to read from underlying input stream
* @param eofOnLimit
* true if EOF should occur on all read calls once limit reached;
* false if -1 should be returned instead
*/
public LimitingInputStream(final InputStream in, final long limit, final boolean eofOnLimit) {
this.in = in; this.in = in;
this.limit = limit; this.limit = limit;
exceptionOnLimit = eofOnLimit;
}
private int limitReached() throws IOException {
if (exceptionOnLimit) {
throw new EOFException("Limit of allowed bytes read from input stream reached");
}
return -1;
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
if (bytesRead >= limit) { if (bytesRead >= limit) {
return -1; return limitReached();
} }
final int val = in.read(); final int val = in.read();
@ -46,7 +81,7 @@ public class LimitingInputStream extends InputStream {
@Override @Override
public int read(final byte[] b) throws IOException { public int read(final byte[] b) throws IOException {
if (bytesRead >= limit) { if (bytesRead >= limit) {
return -1; return limitReached();
} }
final int maxToRead = (int) Math.min(b.length, limit - bytesRead); final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
@ -61,7 +96,7 @@ public class LimitingInputStream extends InputStream {
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
if (bytesRead >= limit) { if (bytesRead >= limit) {
return -1; return limitReached();
} }
final int maxToRead = (int) Math.min(len, limit - bytesRead); final int maxToRead = (int) Math.min(len, limit - bytesRead);

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io;
import java.io.EOFException;
import java.io.IOException;
import junit.framework.TestCase;
public class LimitingInputStreamTest extends TestCase {
private final static byte[] TEST_BUFFER = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
public void testReadLimitNotReached() throws IOException {
LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, false);
long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, TEST_BUFFER.length);
is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, true);
bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, TEST_BUFFER.length);
}
public void testReadLimitExceeded() throws IOException {
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9);
final long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, 9);
}
public void testReadLimitExceededEof() throws IOException {
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9, true);
try {
StreamUtils.copy(is, new ByteArrayOutputStream());
fail("Should not get here");
} catch (final EOFException eof) {
}
}
}

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,37 @@
nifi-email-nar
Copyright 2014-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Spring Framework
The following NOTICE information applies:
Spring Framework
Copyright 2002-2016
(ASLv2) SubEthaSMTP - A SMTP mail server
The following NOTICE information applies:
Spring Framework
Copyright 2006-2007
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail)
************************
Common Development and Distribution License 1.0
************************
The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)

View File

@ -53,6 +53,12 @@
<groupId>org.springframework.integration</groupId> <groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId> <artifactId>spring-integration-mail</artifactId>
<version>4.3.0.RELEASE</version> <version>4.3.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-logging</groupId> <groupId>commons-logging</groupId>

View File

@ -13,16 +13,15 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.email; package org.apache.nifi.processors.email;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -31,71 +30,57 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import org.subethamail.smtp.server.SMTPServer; import javax.net.ssl.SSLSocketFactory;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent; import org.springframework.util.StringUtils;
import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode; import org.subethamail.smtp.server.SMTPServer;
import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory;
@Tags({"listen", "email", "smtp"}) @Tags({"listen", "email", "smtp"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + @CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, "
"allowing nifi to listen for incoming email. " + + "allowing nifi to listen for incoming email. Note this server does not perform any email "
"" + + "validation. If direct exposure to the internet is sought, it may be a better idea to use "
"Note this server does not perform any email validation. If direct exposure to the internet is sought," + + "the combination of NiFi and an industrial scale MTA (e.g. Postfix)")
"it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "The value used during HELO"), @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"),
@WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " +
@WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + "certificates used by an TLS peer"),
"certificates used by an TLS peer"), @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " +
@WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + "certificates used by an TLS peer"),
"certificates used by an TLS peer"), @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection"),
@WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"), @WritesAttribute(attribute = "smtp.recipient", description = "The value used during RCPT TO (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")}) @WritesAttribute(attribute = "mime.type", description = "Mime type of the message")})
public class ListenSMTP extends AbstractSessionFactoryProcessor {
public class ListenSMTP extends AbstractProcessor { static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
public static final String SMTP_HELO = "smtp.helo";
public static final String SMTP_FROM = "smtp.from";
public static final String SMTP_TO = "smtp.to";
public static final String MIME_TYPE = "message/rfc822";
public static final String SMTP_SRC_IP = "smtp.src";
protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.name("SMTP_PORT") .name("SMTP_PORT")
.displayName("Listening Port") .displayName("Listening Port")
.description("The TCP port the ListenSMTP processor will bind to." + .description("The TCP port the ListenSMTP processor will bind to." +
@ -106,26 +91,17 @@ public class ListenSMTP extends AbstractProcessor {
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
.name("SMTP_HOSTNAME")
.displayName("SMTP hostname")
.description("The hostname to be embedded into the banner displayed when an " +
"SMTP client connects to the processor TCP port .")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_CONNECTIONS") .name("SMTP_MAXIMUM_CONNECTIONS")
.displayName("Maximum number of SMTP connection") .displayName("Maximum number of SMTP connection")
.description("The maximum number of simultaneous SMTP connections.") .description("The maximum number of simultaneous SMTP connections.")
.required(true) .required(true)
.defaultValue("1")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR) .addValidator(StandardValidators.INTEGER_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder()
.name("SMTP_TIMEOUT") .name("SMTP_TIMEOUT")
.displayName("SMTP connection timeout") .displayName("SMTP connection timeout")
.description("The maximum time to wait for an action of SMTP client.") .description("The maximum time to wait for an action of SMTP client.")
@ -135,29 +111,17 @@ public class ListenSMTP extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_MSG_SIZE") .name("SMTP_MAXIMUM_MSG_SIZE")
.displayName("SMTP Maximum Message Size") .displayName("SMTP Maximum Message Size")
.description("The maximum number of bytes the server will accept.") .description("The maximum number of bytes the server will accept.")
.required(true) .required(true)
.defaultValue("20MB") .defaultValue("20 MB")
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Integer.MAX_VALUE))
.build(); .build();
protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE")
.displayName("SMTP message buffer length")
.description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " +
"Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" +
"queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("1024")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE") .name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service") .displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
@ -166,7 +130,7 @@ public class ListenSMTP extends AbstractProcessor {
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("CLIENT_AUTH") .name("CLIENT_AUTH")
.displayName("Client Auth") .displayName("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
@ -174,309 +138,243 @@ public class ListenSMTP extends AbstractProcessor {
.allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString())
.build(); .build();
@Override protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { .name("SMTP_HOSTNAME")
final List<ValidationResult> results = new ArrayList<>(); .displayName("SMTP hostname")
.description("The hostname to be embedded into the banner displayed when an " +
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); "SMTP client connects to the processor TCP port .")
final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); .expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results;
}
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Extraction was successful")
.build(); .build();
private Set<Relationship> relationships; static final Relationship REL_SUCCESS = new Relationship.Builder()
private List<PropertyDescriptor> propertyDescriptors; .name("success")
private volatile LinkedBlockingQueue<SmtpEvent> incomingMessages; .description("All new messages will be routed as FlowFiles to this relationship")
.build();
private volatile SMTPServer server; private final static List<PropertyDescriptor> propertyDescriptors;
private AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicBoolean stopping = new AtomicBoolean(false);
private final static Set<Relationship> relationships;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(SMTP_PORT);
_propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
_propertyDescriptors.add(SMTP_TIMEOUT);
_propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
_propertyDescriptors.add(SMTP_HOSTNAME);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
}
private volatile SMTPServer smtp;
private volatile SmtpConsumer smtpConsumer;
private volatile int maxMessageSize;
/**
*
*/
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession processSession = sessionFactory.createSession();
if (this.smtp == null) {
this.setupSmtpIfNecessary(context, processSession);
}
/*
* Will consume incoming message directly from the wire and into
* FlowFile/Content repository before exiting. This essentially limits
* any potential data loss by allowing SMTPServer thread to actually
* commit NiFi session if all good. However in the event of exception,
* such exception will be propagated back to the email sender via
* "undeliverable message" allowing such user to re-send the message
*/
this.smtpConsumer.consumeUsing((inputDataStream) -> {
FlowFile flowFile = processSession.create();
AtomicInteger size = new AtomicInteger();
try {
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
size.set(IOUtils.copy(new LimitingInputStream(inputDataStream, ListenSMTP.this.maxMessageSize, true), out));
}
});
flowFile = updateFlowFileWithAttributes(flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile,
"smtp://" + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/");
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
return size.get();
} catch (Exception e) {
context.yield();
this.getLogger().error("Failed while processing incoming mail. " + e.getMessage(), e);
throw new IllegalStateException("Failed while processing incoming mail. " + e.getMessage(), e);
}
});
}
/**
*
*/
@OnStopped
public void stop() {
this.getLogger().info("Stopping SMTPServer");
this.smtp.stop();
this.smtp = null;
this.getLogger().info("SMTPServer stopped");
}
/**
*
*/
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return relationships;
} }
/**
*
*/
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>();
String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && !StringUtils.hasText(clientAuth)) {
results.add(new ValidationResult.Builder()
.subject(CLIENT_AUTH.getDisplayName())
.explanation(CLIENT_AUTH.getDisplayName() + " must be provided when using " + SSL_CONTEXT_SERVICE.getDisplayName())
.valid(false)
.build());
} else if (sslContextService == null && StringUtils.hasText(clientAuth)) {
results.add(new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName())
.explanation(SSL_CONTEXT_SERVICE.getDisplayName() + " must be provided when selecting " + CLIENT_AUTH.getDisplayName())
.valid(false)
.build());
}
return results;
}
/**
*
*/
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors; return propertyDescriptors;
} }
@Override /**
protected void init(final ProcessorInitializationContext context) { *
final Set<Relationship> relationships = new HashSet<>(); */
relationships.add(REL_SUCCESS); private FlowFile updateFlowFileWithAttributes(FlowFile flowFile, ProcessSession processSession) {
this.relationships = Collections.unmodifiableSet(relationships); Map<String, String> attributes = new HashMap<>();
Certificate[] tlsPeerCertificates = this.smtpConsumer.getMessageContext().getTlsPeerCertificates();
final List<PropertyDescriptor> props = new ArrayList<>(); if (tlsPeerCertificates != null) {
props.add(SMTP_PORT); for (int i = 0; i < tlsPeerCertificates.length; i++) {
props.add(SMTP_HOSTNAME); if (tlsPeerCertificates[i] instanceof X509Certificate) {
props.add(SMTP_MAXIMUM_CONNECTIONS); X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i];
props.add(SMTP_TIMEOUT); attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
props.add(SMTP_MAXIMUM_MSG_SIZE); attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName());
props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); }
props.add(SSL_CONTEXT_SERVICE); }
props.add(CLIENT_AUTH);
this.propertyDescriptors = Collections.unmodifiableList(props);
}
// Upon Schedule, reset the initialized state to false
@OnScheduled
public void onScheduled(ProcessContext context) {
initialized.set(false);
stopping.set(false);
}
protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception {
// check if we are already running or if it is stopping
if (initialized.get() && server.isRunning() || stopping.get() ) {
return;
} }
incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); attributes.put("smtp.helo", this.smtpConsumer.getMessageContext().getHelo());
attributes.put("smtp.remote.addr", this.smtpConsumer.getMessageContext().getRemoteAddress().toString());
attributes.put("smtp.from", this.smtpConsumer.getFrom());
attributes.put("smtp.recepient", this.smtpConsumer.getRecipient());
attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
return processSession.putAllAttributes(flowFile, attributes);
}
String clientAuth = null; /**
*
*/
private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) {
if (this.smtp == null) {
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtpServer = this.createServerInstance(context, consumer);
smtpServer.setSoftwareName("Apache NiFi");
smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
this.maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue();
smtpServer.setMaxMessageSize(this.maxMessageSize);
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
if (context.getProperty(SMTP_HOSTNAME).isSet()) {
smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
}
// If an SSLContextService was provided then create an SSLContext to pass down to the server this.smtpConsumer = consumer;
SSLContext sslContext = null; this.smtp = smtpServer;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); this.smtp.start();
if (sslContextService != null) {
clientAuth = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
} }
}
final SSLContext finalSslContext = sslContext; /**
*
SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); */
final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
SMTPServer smtpServer = sslContextService == null ? new ConsumerAwareSmtpServer(consumer) : new ConsumerAwareSmtpServer(consumer) {
@Override @Override
public SSLSocket createSSLSocket(Socket socket) throws IOException { public SSLSocket createSSLSocket(Socket socket) throws IOException {
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
String clientAuth = context.getProperty(CLIENT_AUTH).getValue();
SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true));
sslSocket.setUseClientMode(false);
SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
this.setRequireTLS(true);
SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); sslSocket.setNeedClientAuth(true);
s.setUseClientMode(false);
// For some reason the createSSLContext above is not enough to enforce
// client side auth
// If client auth is required...
if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
s.setNeedClientAuth(true);
} }
return sslSocket;
return s;
} }
}; };
// Set some parameters to our server
server.setSoftwareName("Apache NiFi");
// Set the Server options based on properties
server.setPort(context.getProperty(SMTP_PORT).asInteger());
server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue());
server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
// Check if TLS should be enabled
if (sslContextService != null) { if (sslContextService != null) {
server.setEnableTLS(true); smtpServer.setEnableTLS(true);
} else { } else {
server.setHideTLS(true); smtpServer.setHideTLS(true);
} }
return smtpServer;
// Set TLS to required in case CLIENT_AUTH = required
if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) {
server.setRequireTLS(true);
}
this.server = server;
server.start();
getLogger().info("Server started and listening on port " + server.getPort());
initialized.set(true);
stopping.set(false);
} }
@OnUnscheduled /**
public void startShutdown() throws Exception { * Wrapper over {@link SMTPServer} that is aware of the {@link SmtpConsumer}
if (server != null) { * to ensure that its stop() operation is called during server stoppage.
stopping.set(true); */
getLogger().info("Shutting down processor P{}", new Object[]{server}); private static class ConsumerAwareSmtpServer extends SMTPServer {
server.stop();
getLogger().info("Shut down {}", new Object[]{server});
}
}
@OnStopped /**
public void completeShutdown() throws Exception { *
if (server != null) { */
if (!server.isRunning() && stopping.get() ) { public ConsumerAwareSmtpServer(SmtpConsumer consumer) {
stopping.set(false); super(consumer);
}
getLogger().info("Completed shut down {}", new Object[]{server});
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
initializeSMTPServer(context);
} catch (Exception e) {
context.yield();
throw new ProcessException("Failed to initialize the SMTP server", e);
} }
while (!incomingMessages.isEmpty()) { /**
SmtpEvent message = incomingMessages.poll(); *
*/
if (message == null) { @Override
return; public synchronized void stop() {
} try {
SmtpConsumer consumer = (SmtpConsumer) this.getMessageHandlerFactory();
synchronized (message) { consumer.stop();
if (resultCodeSetAndIsError(message)) { } finally {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); super.stop();
getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage());
continue;
}
try {
FlowFile flowfile = session.create();
if (message.getMessageData() != null) {
flowfile = session.write(flowfile, out -> {
InputStream inputStream = message.getMessageData();
byte [] buffer = new byte[1024];
int rd;
long totalBytesRead =0;
while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) {
totalBytesRead += rd;
if (totalBytesRead > server.getMaxMessageSize() ) {
message.setReturnCode(500);
message.setProcessed();
break;
}
out.write(buffer, 0, rd);
}
out.flush();
});
} else {
getLogger().debug("Message body was null");
message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode());
message.setProcessed();
}
if (!message.getProcessed()) {
HashMap<String, String> attributes = new HashMap<>();
// Gather message attributes
attributes.put(SMTP_HELO, message.getHelo());
attributes.put(SMTP_SRC_IP, message.getHelo());
attributes.put(SMTP_FROM, message.getFrom());
attributes.put(SMTP_TO, message.getTo());
List<Map<String, String>> details = message.getCertifcateDetails();
int c = 0;
// Add a selection of each X509 certificates to the already gathered attributes
for (Map<String, String> detail : details) {
attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null));
attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null));
c++;
}
// Set Mime-Type
attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE);
// Add the attributes. to flowfile
flowfile = session.putAllAttributes(flowfile, attributes);
session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/");
session.transfer(flowfile, REL_SUCCESS);
getLogger().info("Transferring {} to success", new Object[]{flowfile});
}
} catch (Exception e) {
message.setProcessed();
message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode());
}
// Check to see if it failed when creating the FlowFile
if (resultCodeSetAndIsError(message)) {
session.rollback();
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
message.notifyAll();
continue;
}
// Finished processing,
message.setProcessed();
// notify on the message so data() can process the rest of the method
message.notifyAll();
// Wait for data() to tell sender we received the message and double check we didn't timeout
final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
try {
message.wait(serverTimeout);
} catch (InterruptedException e) {
getLogger().info("Interrupted while waiting for Message Handler to acknowledge message.");
}
// Check to see if the sender was correctly notified
if (resultCodeSetAndIsError(message)) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
session.rollback();
getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage());
} else {
// Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss.
session.commit();
}
} }
} }
} }
private boolean resultCodeSetAndIsError(SmtpEvent message){
if (message.getReturnCode() != null ) {
SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode());
if (resultCode.isError()) {
return true;
}
}
return false;
}
// Same old... same old... used for testing to access the random port that was selected
protected int getPort() {
return server == null ? 0 : server.getPort();
}
} }

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
/**
* A simple consumer that provides a bridge between 'push' message distribution
* provided by {@link SMTPServer} and NiFi polling scheduler mechanism. It
* implements both {@link MessageHandler} and {@link MessageHandlerFactory}
* allowing it to interact directly with {@link SMTPServer}.
*/
class SmtpConsumer implements MessageHandler, MessageHandlerFactory {
private final static int CONSUMER_STOPPED = -1;
private final static int INTERRUPTED = -2;
private final static int ERROR = -9;
private final static int NO_MESSAGE = -8;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final BlockingQueue<InputStream> messageDataQueue = new ArrayBlockingQueue<>(1);
private final BlockingQueue<Integer> consumedMessageSizeQueue = new ArrayBlockingQueue<>(1);
private final AtomicBoolean running = new AtomicBoolean(true);
private volatile MessageContext messageContext;
private volatile String from;
private volatile String recipient;
/**
*
*/
String getFrom() {
return this.from;
}
/**
*
*/
String getRecipient() {
return this.recipient;
}
/**
*
*/
MessageContext getMessageContext() {
return this.messageContext;
}
/**
* This operation will simply attempt to put a poison message to the
* 'consumedMessageSizeQueue' to ensure that in the event this consumer is
* stopped before the message is consumed (see
* {@link #consumeUsing(Function)}), the server thread that is blocking in
* {@link #data(InputStream)} operation can unblock.
*/
// NOTE: the 'synchronize' is only here for API correctness, to ensure that
// stop() and consumeUsing(..) can never be invoked at the same time.
// However within NiFi it can never happen.
synchronized void stop() {
this.running.compareAndSet(true, false);
this.consumedMessageSizeQueue.offer(CONSUMER_STOPPED);
}
/**
* This operation is invoked by the consumer. Implementation of this
* operation creates a synchronous connection with the message producer (see
* {@link #data(InputStream)}) via a pair of queues which guarantees that
* message is fully consumed and disposed by the consumer (provided as
* {@link Function}) before the server closes the data stream.
*/
// NOTE: the 'synchronize' is only here for API correctness, to ensure that
// stop() and consumeUsing(..) can never be invoked at the same time.
// However within NiFi it can never happen.
synchronized void consumeUsing(Function<InputStream, Integer> resultConsumer) {
int messageSize = 0;
try {
InputStream message = this.messageDataQueue.poll(1000, TimeUnit.MILLISECONDS);
if (message != null) {
messageSize = resultConsumer.apply(message);
} else {
messageSize = NO_MESSAGE;
}
} catch (InterruptedException e) {
this.logger.warn("Current thread is interrupted", e);
messageSize = INTERRUPTED;
Thread.currentThread().interrupt();
} finally {
if (messageSize == 0) {
messageSize = ERROR;
}
if (messageSize != NO_MESSAGE) {
this.consumedMessageSizeQueue.offer(messageSize);
}
}
}
/**
* This operation is invoked by the server thread and contains message data
* as {@link InputStream}. Implementation of this operation creates a
* synchronous connection with consumer (see {@link #onMessage(Function)})
* via a pair of queues which guarantees that message is fully consumed and
* disposed by the consumer by the time this operation exits.
*/
@Override
public void data(InputStream data) throws RejectException, TooMuchDataException, IOException {
if (this.running.get()) {
try {
this.messageDataQueue.offer(data, Integer.MAX_VALUE, TimeUnit.SECONDS);
long messageSize = this.consumedMessageSizeQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
String exceptionMessage = null;
if (messageSize == CONSUMER_STOPPED) {
exceptionMessage = "NIFI Consumer was stopped before message was successfully consumed";
} else if (messageSize == INTERRUPTED) {
exceptionMessage = "Consuming thread was interrupted";
} else if (messageSize == ERROR) {
exceptionMessage = "Consuming thread failed while processing 'data' SMTP event.";
}
if (exceptionMessage != null) {
throw new IllegalStateException(exceptionMessage);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Received message of size: " + messageSize);
}
}
} catch (InterruptedException e) {
this.logger.warn("Current thread is interrupted", e);
Thread.currentThread().interrupt();
throw new IllegalStateException("Current thread is interrupted", e);
}
} else {
throw new IllegalStateException("NIFI Consumer was stopped before message was successfully consumed");
}
}
/**
*
*/
@Override
public void from(String from) throws RejectException {
this.from = from;
}
/**
*
*/
@Override
public void recipient(String recipient) throws RejectException {
this.recipient = recipient;
}
/**
*
*/
@Override
public void done() {
// noop
}
/**
*
*/
@Override
public MessageHandler create(MessageContext ctx) {
this.messageContext = ctx;
return this;
}
}

View File

@ -1,125 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.event;
import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A Smtp event which adds the transaction number and command to the StandardEvent.
*/
public class SmtpEvent{
private final String remoteIP;
private final String helo;
private final String from;
private final String to;
private final InputStream messageData;
private List<Map<String, String>> certificatesDetails;
private AtomicBoolean processed = new AtomicBoolean(false);
private AtomicBoolean acknowledged = new AtomicBoolean(false);
private AtomicInteger returnCode = new AtomicInteger();
public SmtpEvent(
final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates,
final InputStream messageData) {
this.remoteIP = remoteIP;
this.helo = helo;
this.from = from;
this.to = to;
this.messageData = messageData;
this.certificatesDetails = new ArrayList<>();
for (int c = 0; c < certificates.length; c++) {
X509Certificate cert = certificates[c];
if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) {
Map<String, String> certificate = new HashMap<>();
String certSerialNumber = cert.getSerialNumber().toString();
String certSubjectDN = cert.getSubjectDN().getName();
certificate.put("SerialNumber", certSerialNumber);
certificate.put("SubjectName", certSubjectDN);
certificatesDetails.add(certificate);
}
}
}
public synchronized List<Map<String, String>> getCertifcateDetails() {
return certificatesDetails;
}
public synchronized String getHelo() {
return helo;
}
public synchronized InputStream getMessageData() {
return messageData;
}
public synchronized String getFrom() {
return from;
}
public synchronized String getTo() {
return to;
}
public synchronized String getRemoteIP() {
return remoteIP;
}
public synchronized void setProcessed() {
this.processed.set(true);
}
public synchronized boolean getProcessed() {
return this.processed.get();
}
public synchronized void setAcknowledged() {
this.acknowledged.set(true);
}
public synchronized boolean getAcknowledged() {
return this.acknowledged.get();
}
public synchronized void setReturnCode(int code) {
this.returnCode.set(code);
}
public synchronized Integer getReturnCode() {
return this.returnCode.get();
}
}

View File

@ -1,165 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.handler;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.X509Certificate;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.DropConnectionException;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.apache.nifi.processors.email.smtp.event.SmtpEvent;
public class SMTPMessageHandlerFactory implements MessageHandlerFactory {
final LinkedBlockingQueue<SmtpEvent> incomingMessages;
final ComponentLog logger;
public SMTPMessageHandlerFactory(LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger) {
this.incomingMessages = incomingMessages;
this.logger = logger;
}
@Override
public MessageHandler create(MessageContext messageContext) {
return new Handler(messageContext, incomingMessages, logger);
}
class Handler implements MessageHandler {
final MessageContext messageContext;
String from;
String recipient;
public Handler(MessageContext messageContext, LinkedBlockingQueue<SmtpEvent> incomingMessages, ComponentLog logger){
this.messageContext = messageContext;
}
@Override
public void from(String from) throws RejectException {
// TODO: possibly whitelist senders?
this.from = from;
}
@Override
public void recipient(String recipient) throws RejectException {
// TODO: possibly whitelist receivers?
this.recipient = recipient;
}
@Override
public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException {
// Start counting the timer...
StopWatch watch = new StopWatch(true);
long elapsed;
final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS);
X509Certificate[] certificates = new X509Certificate[]{};
final String remoteIP = messageContext.getRemoteAddress().toString();
final String helo = messageContext.getHelo();
if (messageContext.getTlsPeerCertificates() != null ){
certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone();
}
SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream);
synchronized (message) {
// / Try to queue the message back to the NiFi session
try {
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
final SMTPResultCode returnCode = SMTPResultCode.fromCode(421);
logger.trace(returnCode.getLogMessage());
// NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of
// adding message to the processing queue. Yet, for the sake of consistency the message is
// updated nonetheless
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Once message has been sent to the queue, it should be processed by NiFi onTrigger,
// a flowfile created and its processed status updated before an acknowledgment is
// given back to the SMTP client
elapsed = watch.getElapsed(TimeUnit.MILLISECONDS);
try {
message.wait(serverTimeout - elapsed);
} catch (InterruptedException e) {
// Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback
logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure");
incomingMessages.remove(message);
// Set the final values so onTrigger can figure out what happened to message
final SMTPResultCode returnCode = SMTPResultCode.fromCode(423);
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
// Inform client
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
// Check if message is processed
if (!message.getProcessed()) {
incomingMessages.remove(message);
final SMTPResultCode returnCode = SMTPResultCode.fromCode(451);
logger.trace("Did not receive the onTrigger response within the acceptable timeframe.");
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(returnCode.getCode());
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
} else if(message.getReturnCode() != null) {
// No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger.
final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode());
if(returnCode.isError()){
message.setAcknowledged();
throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage());
}
} else {
// onTrigger successfully processed the data.
// No need to check if over server timeout because we already processed the data. Might as well finalize it.
// Set the final values so onTrigger can figure out what happened to message
message.setReturnCode(250);
message.setAcknowledged();
}
// Exit, allowing Handler to acknowledge the message
message.notifyAll();
}
}
@Override
public void done() {
logger.trace("Called the last method of message handler. Exiting");
// Notifying the ontrigger that the message was handled.
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp.handler;
public enum SMTPResultCode {
// This error isn't raised by code. Being just a default value for the
// fromCode method below
UNKNOWN_ERROR_CODE(0,
"Unknown error.",
"Failed due to unknown error"),
SUCCESS (250,
"Success delivering message",
"Message from {} to {} via {} acknowledgement complete"),
QUEUE_ERROR (421,
"Could not queue the message. Try again",
"The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ),
UNEXPECTED_ERROR(423,
"Unexpected Error. Please try again or contact the administrator in case it persists",
"Error hit during delivery of message from {}"),
TIMEOUT_ERROR (451,
"The processing of your message timed-out, we may have received it but you better off sending it again",
"Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"),
MESSAGE_TOO_LARGE(500,
"Message rejected due to length/size of data",
"Your message exceeds the maximum permitted size");
private static final SMTPResultCode[] codeArray = new SMTPResultCode[501];
static {
for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) {
codeArray[smtpResultCode.getCode()] = smtpResultCode;
}
}
private final int code;
private final String errorMessage;
private final String logMessage;
SMTPResultCode(int code, String errorMessage, String logMessage) {
this.code = code;
this.errorMessage = errorMessage;
this.logMessage = logMessage;
}
public int getCode() {
return code;
}
public String getErrorMessage() {
return errorMessage;
}
public String getLogMessage() {
return logMessage;
}
public static SMTPResultCode fromCode(int code) {
final SMTPResultCode smtpResultCode = codeArray[code];
return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode;
}
public boolean isError(){
switch (this) {
case MESSAGE_TOO_LARGE:
case UNEXPECTED_ERROR:
case QUEUE_ERROR:
case TIMEOUT_ERROR:
return true;
default:
return false;
}
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.SimpleEmail;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.subethamail.smtp.server.SMTPServer;
public class SmtpConsumerTest {
private volatile ExecutorService executor;
@Before
public void before() {
this.executor = Executors.newCachedThreadPool();
}
@After
public void after() {
this.executor.shutdown();
}
@Test
public void validateServerCanStopWhenConsumerStoppedBeforeConsumingMessage() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
CountDownLatch latch = new CountDownLatch(10);
AtomicReference<Exception> exception = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
consumer.data(mock(InputStream.class));
} catch (Exception e) {
e.printStackTrace();
exception.set(e);
} finally {
latch.countDown();
}
}
}
});
boolean finished = latch.await(2000, TimeUnit.MILLISECONDS);
assertFalse(finished);
this.executor.shutdown();
boolean terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertFalse(terminated);
consumer.stop();
finished = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(finished);
terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertTrue(terminated);
}
/*
* This test simply validates that consumeUsing(..) can react properly to
* thread interrupts. That said the condition is impossible in the current
* usage of SmtpConsumer
*/
@Test
public void validateServerCanStopWhenConsumerInterrupted() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
AtomicReference<Thread> thread = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
thread.set(Thread.currentThread());
consumer.consumeUsing((in) -> {
return 0;
});
}
});
this.executor.shutdown();
boolean terminated = this.executor.awaitTermination(200, TimeUnit.MILLISECONDS);
assertFalse(terminated); // the call to consumeUsing(..) is blocking on
// the queue.poll since nothing is there
thread.get().interrupt(); // interrupt thread that executes
// consumeUsing(..)
terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertTrue(terminated);
}
/*
* Emulates any errors that may arise while reading the {@link InputStream}
* delivered as part of the data() call.
*/
@Test
public void validateServerCanStopWhenConsumerErrors() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
try {
consumer.data(mock(InputStream.class));
} catch (Exception e) {
exception.set(e);
} finally {
latch.countDown();
}
}
});
this.executor.execute(new Runnable() {
@Override
public void run() {
consumer.consumeUsing((in) -> {
throw new RuntimeException("intentional");
});
}
});
// this to ensure that call to data unblocks
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(exception.get() instanceof IllegalStateException);
assertEquals("Consuming thread failed while processing 'data' SMTP event.", exception.get().getMessage());
}
@Test
public void validateServerCanStopWithUnconsumedMessage() throws Exception {
int port = NetworkUtils.availablePort();
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtp = new SMTPServer(consumer);
smtp.setPort(port);
smtp.setSoftwareName("Apache NiFi");
smtp.start();
BlockingQueue<Exception> exceptionQueue = new ArrayBlockingQueue<>(1);
this.executor.execute(new Runnable() {
@Override
public void run() {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Hello SMTP");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (Exception e) {
exceptionQueue.offer(e);
}
}
});
assertNull(exceptionQueue.poll());
smtp.stop();
Exception ex = exceptionQueue.poll(100, TimeUnit.MILLISECONDS);
assertNotNull(ex);
/*
* This essentially ensures and validates that if NiFi was not able to
* successfully consume message the aftermath of the exception thrown by
* the consumer is propagated to the sender essentially ensuring no data
* loss by allowing sender to resend
*/
assertTrue(ex.getMessage().startsWith("Sending the email to the following server failed"));
}
@Test
public void validateConsumer() throws Exception {
int port = NetworkUtils.availablePort();
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtp = new SMTPServer(consumer);
smtp.setPort(port);
smtp.setSoftwareName("Apache NiFi");
smtp.start();
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("bob@nifi.apache.org");
email.send();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
});
List<String> messages = new ArrayList<>();
for (AtomicInteger i = new AtomicInteger(); i.get() < messageCount;) {
consumer.consumeUsing((dataInputStream) -> {
i.incrementAndGet();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int size = 0;
try {
size = IOUtils.copy(dataInputStream, bos);
messages.add(new String(bos.toByteArray(), StandardCharsets.UTF_8));
return size;
} catch (Exception e) {
e.printStackTrace();
fail();
}
return size;
});
}
boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
assertTrue(complete);
assertTrue(messages.size() == messageCount);
assertTrue(messages.get(0).contains("MSG-0"));
assertTrue(messages.get(1).contains("MSG-1"));
assertTrue(messages.get(2).contains("MSG-2"));
assertTrue(messages.get(3).contains("MSG-3"));
assertTrue(messages.get(4).contains("MSG-4"));
smtp.stop();
}
}

View File

@ -13,42 +13,118 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.email; package org.apache.nifi.processors.email;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.mail.Email; import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailException; import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.SimpleEmail; import org.apache.commons.mail.SimpleEmail;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.apache.nifi.ssl.SSLContextService; import org.junit.Before;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestListenSMTP { public class TestListenSMTP {
@Test(timeout=15000) private ScheduledExecutorService executor;
public void ValidEmailTls() throws Exception {
boolean[] failed = {false};
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0"); /**
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); *
*/
@Before
public void before() {
this.executor = Executors.newScheduledThreadPool(2);
}
/**
*
*/
@After
public void after() {
this.executor.shutdown();
}
/**
*
*/
@Test
public void validateSuccessfulInteraction() throws Exception, EmailException {
int port = NetworkUtils.availablePort();
TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
runner.assertValid();
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
this.executor.schedule(new Runnable() {
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("bob@nifi.apache.org");
email.send();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}
}
}, 1000, TimeUnit.MILLISECONDS);
boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", 5);
}
/**
*
*/
@Test
public void validateSuccessfulInteractionWithTls() throws Exception, EmailException {
System.setProperty("mail.smtp.ssl.trust", "*");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "localtest");
int port = NetworkUtils.availablePort();
TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
// Setup the SSL Context // Setup the SSL Context
final SSLContextService sslContextService = new StandardSSLContextService(); SSLContextService sslContextService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslContextService); runner.addControllerService("ssl-context", sslContextService);
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
@ -58,262 +134,107 @@ public class TestListenSMTP {
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService); runner.enableControllerService(sslContextService);
// and add the SSL context to the runner // and add the SSL context to the runner
runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
runner.assertValid();
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); this.executor.schedule(new Runnable() {
final ProcessContext context = runner.getProcessContext(); @Override
public void run() {
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog for (int i = 0; i < messageCount; i++) {
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
try {
final Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
try { try {
System.setProperty("mail.smtp.ssl.trust", "*");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks");
System.setProperty("javax.net.ssl.keyStorePassword", "localtest");
Email email = new SimpleEmail(); Email email = new SimpleEmail();
email.setHostName("localhost");
email.setHostName("127.0.0.1");
email.setSmtpPort(port); email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("bob@nifi.apache.org");
// Enable STARTTLS but ignore the cert // Enable STARTTLS but ignore the cert
email.setStartTLSEnabled(true); email.setStartTLSEnabled(true);
email.setStartTLSRequired(true); email.setStartTLSRequired(true);
email.setSSLCheckServerIdentity(false); email.setSSLCheckServerIdentity(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send(); email.send();
} catch (final Throwable t) { } catch (Exception e) {
failed[0] = true; e.printStackTrace();
throw new RuntimeException(e);
} finally {
latch.countDown();
} }
} }
});
clientThread.start();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
} }
}, 1500, TimeUnit.MILLISECONDS);
// Checks if client experienced Exception boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
Assert.assertFalse("Client experienced exception", failed[0]); runner.shutdown();
assertTrue(complete);
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred("success", 5);
clientThread.stop();
Assert.assertFalse("Sending email failed", failed[0]);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
} finally {
// shut down the server
listenSmtp.startShutdown();
}
} }
@Test(timeout=15000) /**
public void ValidEmail() throws Exception, EmailException { *
final boolean[] failed = {false}; */
ListenSMTP listenSmtp = new ListenSMTP(); @Test
final TestRunner runner = TestRunners.newTestRunner(listenSmtp); public void validateTooLargeMessage() throws Exception, EmailException {
int port = NetworkUtils.availablePort();
runner.setProperty(ListenSMTP.SMTP_PORT, "0"); TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class);
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port));
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); runner.assertValid();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog int messageCount = 1;
// where listenSmtp method calls are used to allow the processor to be started using CountDownLatch latch = new CountDownLatch(messageCount);
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort(); this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
try { this.executor.schedule(new Runnable() {
final Thread clientThread = new Thread(new Runnable() { @Override
@Override public void run() {
public void run() { for (int i = 0; i < messageCount; i++) {
try { try {
Email email = new SimpleEmail(); Email email = new SimpleEmail();
email.setHostName("127.0.0.1"); email.setHostName("localhost");
email.setSmtpPort(port); email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org"); email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test"); email.setSubject("This is a test");
email.setMsg("Test test test chocolate"); email.setMsg("MSG-" + i);
email.addTo("bob@nifi.apache.org"); email.addTo("bob@nifi.apache.org");
email.send(); email.send();
} catch (Exception e) {
} catch (final EmailException t) { e.printStackTrace();
failed[0] = true; throw new RuntimeException(e);
} finally {
latch.countDown();
} }
} }
});
clientThread.start();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
} }
clientThread.stop(); }, 1000, TimeUnit.MILLISECONDS);
Assert.assertFalse("Sending email failed", failed[0]); boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
} finally {
// shut down the server
listenSmtp.startShutdown();
}
}
@Test(timeout=15000, expected=EmailException.class)
public void ValidEmailTimeOut() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "50 milliseconds");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) {
// force timeout
Thread.sleep(999L);
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
}
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS);
splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org");
splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org");
Thread.sleep(100);
// shut down the server
listenSmtp.startShutdown();
}
@Test(timeout=15000)
public void emailTooLarge() throws Exception {
ListenSMTP listenSmtp = new ListenSMTP();
final TestRunner runner = TestRunners.newTestRunner(listenSmtp);
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "256B");
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "2");
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
// NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog
// where listenSmtp method calls are used to allow the processor to be started using
// port "0" without triggering a violation of PORT_VALIDATOR
listenSmtp.onScheduled(context);
listenSmtp.initializeSMTPServer(context);
final int port = listenSmtp.getPort();
AtomicBoolean finished = new AtomicBoolean(false);;
AtomicBoolean failed = new AtomicBoolean(false);
try {
final Thread clientThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Email email = new SimpleEmail();
email.setHostName("127.0.0.1");
email.setSmtpPort(port);
email.setStartTLSEnabled(false);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Test test test chocolate");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (final EmailException t) {
failed.set(true);
}
finished.set(true);
}
});
clientThread.start();
while (!finished.get()) {
// process the request.
listenSmtp.onTrigger(context, processSessionFactory);
Thread.sleep(10);
}
clientThread.stop();
Assert.assertTrue("Sending email succeeded when it should have failed", failed.get());
runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0);
runner.assertQueueEmpty();
} finally {
// shut down the server
listenSmtp.startShutdown();
}
} }
} }