From 48fa76ecfff410b09a0b97fd70468e028066e990 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 10 Aug 2016 10:19:17 -0400 Subject: [PATCH] NIFI-2519 Fixed and refactored ListenSMTP processor - Removed message queueing which could result in data loss - Fixed life-cycle issues that coudl put processor in an unstable state - Fixed PropertyDescriptor translation for Time units and Byte sizes - Fixed broken tests - Added additional tests NIFI-2519 added default for SMTP_MAXIMUM_CONNECTIONS NIFI-2519 addressed PR comments, polishing - fixed intermittent deadlock on processor stop and added test for it - the attributes that can not be extracted from the message but available via MessageContext are written into the outgoing FlowFile - other minor fixes NIFI-2519 addressed lates PR comments NIFI-2519 added better messaging when server closes the connection NIFI-2519 some polishing and additional tests to validate deadlocks NIFI-2519 address latest PR comments fixed deadlock condition for when the consumer is stopped while server is distributing messages fixed MAX message size issue ensuring it is validated set max connections to SMTPServer polished pom added L&N NIFI-2519 PR comments - fixed LICENSE - Added usage of LimitingInputStream - simplified SmtpConsumer by removing hasMessage operation --- .../nifi/remote/io/socket/NetworkUtils.java | 46 ++ .../nifi/stream/io/LimitingInputStream.java | 41 +- .../stream/io/LimitingInputStreamTest.java | 51 ++ .../src/main/resources/META-INF/LICENSE | 202 ++++++ .../src/main/resources/META-INF/NOTICE | 37 ++ .../nifi-email-processors/pom.xml | 6 + .../nifi/processors/email/ListenSMTP.java | 598 ++++++++---------- .../nifi/processors/email/SmtpConsumer.java | 206 ++++++ .../email/smtp/event/SmtpEvent.java | 125 ---- .../handler/SMTPMessageHandlerFactory.java | 165 ----- .../email/smtp/handler/SMTPResultCode.java | 93 --- .../processors/email/SmtpConsumerTest.java | 266 ++++++++ .../nifi/processors/email/TestListenSMTP.java | 395 +++++------- 13 files changed, 1258 insertions(+), 973 deletions(-) create mode 100644 nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java create mode 100644 nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java delete mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java delete mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java delete mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java new file mode 100644 index 0000000000..b2deeccbfe --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * + */ +public class NetworkUtils { + + /** + * Will determine the available port + */ + public final static int availablePort() { + ServerSocket s = null; + try { + s = new ServerSocket(0); + s.setReuseAddress(true); + return s.getLocalPort(); + } catch (Exception e) { + throw new IllegalStateException("Failed to discover available port.", e); + } finally { + try { + s.close(); + } catch (IOException e) { + // ignore + } + } + } +} diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java index a657030e1a..b1ebf2fbcb 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.stream.io; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -24,16 +25,50 @@ public class LimitingInputStream extends InputStream { private final InputStream in; private final long limit; private long bytesRead = 0; + private final boolean exceptionOnLimit; + /** + * Constructs a limited input stream whereby if the limit is reached all + * subsequent calls to read will return a -1. + * + * @param in + * the underlying input stream + * @param limit + * maximum length of bytes to read from underlying input stream + */ public LimitingInputStream(final InputStream in, final long limit) { + this(in, limit, false); + } + + /** + * Constructs a limited input stream whereby if the limit is reached all + * subsequent calls to read will return a -1 or EOFexception as configured + * + * @param in + * the underlying input stream + * @param limit + * maximum length of bytes to read from underlying input stream + * @param eofOnLimit + * true if EOF should occur on all read calls once limit reached; + * false if -1 should be returned instead + */ + public LimitingInputStream(final InputStream in, final long limit, final boolean eofOnLimit) { this.in = in; this.limit = limit; + exceptionOnLimit = eofOnLimit; + } + + private int limitReached() throws IOException { + if (exceptionOnLimit) { + throw new EOFException("Limit of allowed bytes read from input stream reached"); + } + return -1; } @Override public int read() throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int val = in.read(); @@ -46,7 +81,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(final byte[] b) throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int maxToRead = (int) Math.min(b.length, limit - bytesRead); @@ -61,7 +96,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(byte[] b, int off, int len) throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int maxToRead = (int) Math.min(len, limit - bytesRead); diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java new file mode 100644 index 0000000000..5bb4362e1a --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.EOFException; +import java.io.IOException; + +import junit.framework.TestCase; + +public class LimitingInputStreamTest extends TestCase { + private final static byte[] TEST_BUFFER = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + + public void testReadLimitNotReached() throws IOException { + LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, false); + long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, TEST_BUFFER.length); + + is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, true); + bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, TEST_BUFFER.length); + } + + public void testReadLimitExceeded() throws IOException { + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9); + final long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, 9); + } + + public void testReadLimitExceededEof() throws IOException { + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9, true); + try { + StreamUtils.copy(is, new ByteArrayOutputStream()); + fail("Should not get here"); + } catch (final EOFException eof) { + } + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..dc462ceb12 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,37 @@ +nifi-email-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Spring Framework + The following NOTICE information applies: + Spring Framework + Copyright 2002-2016 + + (ASLv2) SubEthaSMTP - A SMTP mail server + The following NOTICE information applies: + Spring Framework + Copyright 2006-2007 + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail) + +************************ +Common Development and Distribution License 1.0 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp) diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index 75cd3d2119..34125f4263 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -53,6 +53,12 @@ org.springframework.integration spring-integration-mail 4.3.0.RELEASE + + + org.springframework.retry + spring-retry + + commons-logging diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java index 0f17838837..52d448168b 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -13,16 +13,15 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.nifi.processors.email; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; import java.io.IOException; -import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,71 +30,57 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; - -import org.subethamail.smtp.server.SMTPServer; - - -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.DataUnit; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; - -import org.apache.nifi.processors.email.smtp.event.SmtpEvent; -import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode; -import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.springframework.util.StringUtils; +import org.subethamail.smtp.server.SMTPServer; @Tags({"listen", "email", "smtp"}) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + - "allowing nifi to listen for incoming email. " + - "" + - "Note this server does not perform any email validation. If direct exposure to the internet is sought," + - "it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)") +@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + + "allowing nifi to listen for incoming email. Note this server does not perform any email " + + "validation. If direct exposure to the internet is sought, it may be a better idea to use " + + "the combination of NiFi and an industrial scale MTA (e.g. Postfix)") @WritesAttributes({ - @WritesAttribute(attribute = "mime.type", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + - "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + - "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")}) - -public class ListenSMTP extends AbstractProcessor { - public static final String SMTP_HELO = "smtp.helo"; - public static final String SMTP_FROM = "smtp.from"; - public static final String SMTP_TO = "smtp.to"; - public static final String MIME_TYPE = "message/rfc822"; - public static final String SMTP_SRC_IP = "smtp.src"; - - - protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() + @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), + @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection"), + @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), + @WritesAttribute(attribute = "smtp.recipient", description = "The value used during RCPT TO (i.e. envelope)"), + @WritesAttribute(attribute = "mime.type", description = "Mime type of the message")}) +public class ListenSMTP extends AbstractSessionFactoryProcessor { + static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() .name("SMTP_PORT") .displayName("Listening Port") .description("The TCP port the ListenSMTP processor will bind to." + @@ -106,26 +91,17 @@ public class ListenSMTP extends AbstractProcessor { .addValidator(StandardValidators.PORT_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() - .name("SMTP_HOSTNAME") - .displayName("SMTP hostname") - .description("The hostname to be embedded into the banner displayed when an " + - "SMTP client connects to the processor TCP port .") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() .name("SMTP_MAXIMUM_CONNECTIONS") .displayName("Maximum number of SMTP connection") .description("The maximum number of simultaneous SMTP connections.") .required(true) + .defaultValue("1") .expressionLanguageSupported(false) .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() .name("SMTP_TIMEOUT") .displayName("SMTP connection timeout") .description("The maximum time to wait for an action of SMTP client.") @@ -135,29 +111,17 @@ public class ListenSMTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() .name("SMTP_MAXIMUM_MSG_SIZE") .displayName("SMTP Maximum Message Size") .description("The maximum number of bytes the server will accept.") .required(true) - .defaultValue("20MB") + .defaultValue("20 MB") .expressionLanguageSupported(false) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Integer.MAX_VALUE)) .build(); - protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder() - .name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE") - .displayName("SMTP message buffer length") - .description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " + - "Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" + - "queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .defaultValue("1024") - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL_CONTEXT_SERVICE") .displayName("SSL Context Service") .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + @@ -166,7 +130,7 @@ public class ListenSMTP extends AbstractProcessor { .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") @@ -174,309 +138,243 @@ public class ListenSMTP extends AbstractProcessor { .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .build(); - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } - - return results; - - } - - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Extraction was successful") + protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() + .name("SMTP_HOSTNAME") + .displayName("SMTP hostname") + .description("The hostname to be embedded into the banner displayed when an " + + "SMTP client connects to the processor TCP port .") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - private Set relationships; - private List propertyDescriptors; - private volatile LinkedBlockingQueue incomingMessages; + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All new messages will be routed as FlowFiles to this relationship") + .build(); - private volatile SMTPServer server; - private AtomicBoolean initialized = new AtomicBoolean(false); - private AtomicBoolean stopping = new AtomicBoolean(false); + private final static List propertyDescriptors; + private final static Set relationships; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SMTP_PORT); + _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); + _propertyDescriptors.add(SMTP_TIMEOUT); + _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + _propertyDescriptors.add(SMTP_HOSTNAME); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + private volatile SMTPServer smtp; + + private volatile SmtpConsumer smtpConsumer; + + private volatile int maxMessageSize; + + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession processSession = sessionFactory.createSession(); + if (this.smtp == null) { + this.setupSmtpIfNecessary(context, processSession); + } + + /* + * Will consume incoming message directly from the wire and into + * FlowFile/Content repository before exiting. This essentially limits + * any potential data loss by allowing SMTPServer thread to actually + * commit NiFi session if all good. However in the event of exception, + * such exception will be propagated back to the email sender via + * "undeliverable message" allowing such user to re-send the message + */ + this.smtpConsumer.consumeUsing((inputDataStream) -> { + FlowFile flowFile = processSession.create(); + AtomicInteger size = new AtomicInteger(); + try { + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + size.set(IOUtils.copy(new LimitingInputStream(inputDataStream, ListenSMTP.this.maxMessageSize, true), out)); + } + }); + flowFile = updateFlowFileWithAttributes(flowFile, processSession); + + processSession.getProvenanceReporter().receive(flowFile, + "smtp://" + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + return size.get(); + } catch (Exception e) { + context.yield(); + this.getLogger().error("Failed while processing incoming mail. " + e.getMessage(), e); + throw new IllegalStateException("Failed while processing incoming mail. " + e.getMessage(), e); + } + }); + } + + /** + * + */ + @OnStopped + public void stop() { + this.getLogger().info("Stopping SMTPServer"); + this.smtp.stop(); + this.smtp = null; + this.getLogger().info("SMTPServer stopped"); + } + + /** + * + */ @Override public Set getRelationships() { return relationships; } + /** + * + */ + @Override + protected Collection customValidate(ValidationContext validationContext) { + List results = new ArrayList<>(); + + String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); + SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (sslContextService != null && !StringUtils.hasText(clientAuth)) { + results.add(new ValidationResult.Builder() + .subject(CLIENT_AUTH.getDisplayName()) + .explanation(CLIENT_AUTH.getDisplayName() + " must be provided when using " + SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .build()); + } else if (sslContextService == null && StringUtils.hasText(clientAuth)) { + results.add(new ValidationResult.Builder() + .subject(SSL_CONTEXT_SERVICE.getDisplayName()) + .explanation(SSL_CONTEXT_SERVICE.getDisplayName() + " must be provided when selecting " + CLIENT_AUTH.getDisplayName()) + .valid(false) + .build()); + } + return results; + } + + /** + * + */ @Override protected List getSupportedPropertyDescriptors() { return propertyDescriptors; } - @Override - protected void init(final ProcessorInitializationContext context) { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - - final List props = new ArrayList<>(); - props.add(SMTP_PORT); - props.add(SMTP_HOSTNAME); - props.add(SMTP_MAXIMUM_CONNECTIONS); - props.add(SMTP_TIMEOUT); - props.add(SMTP_MAXIMUM_MSG_SIZE); - props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); - props.add(SSL_CONTEXT_SERVICE); - props.add(CLIENT_AUTH); - this.propertyDescriptors = Collections.unmodifiableList(props); - - } - - // Upon Schedule, reset the initialized state to false - @OnScheduled - public void onScheduled(ProcessContext context) { - initialized.set(false); - stopping.set(false); - } - - protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { - - // check if we are already running or if it is stopping - if (initialized.get() && server.isRunning() || stopping.get() ) { - return; + /** + * + */ + private FlowFile updateFlowFileWithAttributes(FlowFile flowFile, ProcessSession processSession) { + Map attributes = new HashMap<>(); + Certificate[] tlsPeerCertificates = this.smtpConsumer.getMessageContext().getTlsPeerCertificates(); + if (tlsPeerCertificates != null) { + for (int i = 0; i < tlsPeerCertificates.length; i++) { + if (tlsPeerCertificates[i] instanceof X509Certificate) { + X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i]; + attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); + attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName()); + } + } } - incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); + attributes.put("smtp.helo", this.smtpConsumer.getMessageContext().getHelo()); + attributes.put("smtp.remote.addr", this.smtpConsumer.getMessageContext().getRemoteAddress().toString()); + attributes.put("smtp.from", this.smtpConsumer.getFrom()); + attributes.put("smtp.recepient", this.smtpConsumer.getRecipient()); + attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); + return processSession.putAllAttributes(flowFile, attributes); + } - String clientAuth = null; + /** + * + */ + private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.smtp == null) { + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtpServer = this.createServerInstance(context, consumer); + smtpServer.setSoftwareName("Apache NiFi"); + smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); + smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); + this.maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); + smtpServer.setMaxMessageSize(this.maxMessageSize); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + if (context.getProperty(SMTP_HOSTNAME).isSet()) { + smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); + } - // If an SSLContextService was provided then create an SSLContext to pass down to the server - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + this.smtpConsumer = consumer; + this.smtp = smtpServer; + this.smtp.start(); } + } - final SSLContext finalSslContext = sslContext; - - SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); - final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { - + /** + * + */ + private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { + SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + SMTPServer smtpServer = sslContextService == null ? new ConsumerAwareSmtpServer(consumer) : new ConsumerAwareSmtpServer(consumer) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + sslSocket.setUseClientMode(false); - SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); - - SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); - - s.setUseClientMode(false); - - - // For some reason the createSSLContext above is not enough to enforce - // client side auth - // If client auth is required... - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - s.setNeedClientAuth(true); + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { + this.setRequireTLS(true); + sslSocket.setNeedClientAuth(true); } - - - return s; + return sslSocket; } }; - - // Set some parameters to our server - server.setSoftwareName("Apache NiFi"); - - - // Set the Server options based on properties - server.setPort(context.getProperty(SMTP_PORT).asInteger()); - server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); - server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - - // Check if TLS should be enabled if (sslContextService != null) { - server.setEnableTLS(true); + smtpServer.setEnableTLS(true); } else { - server.setHideTLS(true); + smtpServer.setHideTLS(true); } - - // Set TLS to required in case CLIENT_AUTH = required - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - server.setRequireTLS(true); - } - - this.server = server; - server.start(); - - getLogger().info("Server started and listening on port " + server.getPort()); - - initialized.set(true); - stopping.set(false); + return smtpServer; } - @OnUnscheduled - public void startShutdown() throws Exception { - if (server != null) { - stopping.set(true); - getLogger().info("Shutting down processor P{}", new Object[]{server}); - server.stop(); - getLogger().info("Shut down {}", new Object[]{server}); - } - } + /** + * Wrapper over {@link SMTPServer} that is aware of the {@link SmtpConsumer} + * to ensure that its stop() operation is called during server stoppage. + */ + private static class ConsumerAwareSmtpServer extends SMTPServer { - @OnStopped - public void completeShutdown() throws Exception { - if (server != null) { - if (!server.isRunning() && stopping.get() ) { - stopping.set(false); - } - getLogger().info("Completed shut down {}", new Object[]{server}); - } - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - - try { - initializeSMTPServer(context); - } catch (Exception e) { - context.yield(); - throw new ProcessException("Failed to initialize the SMTP server", e); + /** + * + */ + public ConsumerAwareSmtpServer(SmtpConsumer consumer) { + super(consumer); } - while (!incomingMessages.isEmpty()) { - SmtpEvent message = incomingMessages.poll(); - - if (message == null) { - return; - } - - synchronized (message) { - if (resultCodeSetAndIsError(message)) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage()); - continue; - } - - try { - FlowFile flowfile = session.create(); - - if (message.getMessageData() != null) { - flowfile = session.write(flowfile, out -> { - InputStream inputStream = message.getMessageData(); - byte [] buffer = new byte[1024]; - - int rd; - long totalBytesRead =0; - - while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { - totalBytesRead += rd; - if (totalBytesRead > server.getMaxMessageSize() ) { - message.setReturnCode(500); - message.setProcessed(); - break; - } - out.write(buffer, 0, rd); - } - out.flush(); - }); - } else { - getLogger().debug("Message body was null"); - message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode()); - message.setProcessed(); - } - - if (!message.getProcessed()) { - HashMap attributes = new HashMap<>(); - // Gather message attributes - attributes.put(SMTP_HELO, message.getHelo()); - attributes.put(SMTP_SRC_IP, message.getHelo()); - attributes.put(SMTP_FROM, message.getFrom()); - attributes.put(SMTP_TO, message.getTo()); - - List> details = message.getCertifcateDetails(); - int c = 0; - - // Add a selection of each X509 certificates to the already gathered attributes - - for (Map detail : details) { - attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); - attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); - c++; - } - - // Set Mime-Type - attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE); - - // Add the attributes. to flowfile - flowfile = session.putAllAttributes(flowfile, attributes); - session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/"); - session.transfer(flowfile, REL_SUCCESS); - - getLogger().info("Transferring {} to success", new Object[]{flowfile}); - } - } catch (Exception e) { - message.setProcessed(); - message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode()); - } - - // Check to see if it failed when creating the FlowFile - if (resultCodeSetAndIsError(message)) { - session.rollback(); - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); - message.notifyAll(); - continue; - } - - // Finished processing, - message.setProcessed(); - - // notify on the message so data() can process the rest of the method - message.notifyAll(); - - // Wait for data() to tell sender we received the message and double check we didn't timeout - final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - try { - message.wait(serverTimeout); - } catch (InterruptedException e) { - getLogger().info("Interrupted while waiting for Message Handler to acknowledge message."); - } - - // Check to see if the sender was correctly notified - if (resultCodeSetAndIsError(message)) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - session.rollback(); - getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); - } else { - // Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss. - session.commit(); - } + /** + * + */ + @Override + public synchronized void stop() { + try { + SmtpConsumer consumer = (SmtpConsumer) this.getMessageHandlerFactory(); + consumer.stop(); + } finally { + super.stop(); } } } - - private boolean resultCodeSetAndIsError(SmtpEvent message){ - if (message.getReturnCode() != null ) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - if (resultCode.isError()) { - return true; - } - } - return false; - } - - // Same old... same old... used for testing to access the random port that was selected - protected int getPort() { - return server == null ? 0 : server.getPort(); - } - - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java new file mode 100644 index 0000000000..2063c0da45 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandler; +import org.subethamail.smtp.MessageHandlerFactory; +import org.subethamail.smtp.RejectException; +import org.subethamail.smtp.TooMuchDataException; +import org.subethamail.smtp.server.SMTPServer; + +/** + * A simple consumer that provides a bridge between 'push' message distribution + * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. It + * implements both {@link MessageHandler} and {@link MessageHandlerFactory} + * allowing it to interact directly with {@link SMTPServer}. + */ +class SmtpConsumer implements MessageHandler, MessageHandlerFactory { + + private final static int CONSUMER_STOPPED = -1; + + private final static int INTERRUPTED = -2; + + private final static int ERROR = -9; + + private final static int NO_MESSAGE = -8; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final BlockingQueue messageDataQueue = new ArrayBlockingQueue<>(1); + + private final BlockingQueue consumedMessageSizeQueue = new ArrayBlockingQueue<>(1); + + private final AtomicBoolean running = new AtomicBoolean(true); + + private volatile MessageContext messageContext; + + private volatile String from; + + private volatile String recipient; + + + /** + * + */ + String getFrom() { + return this.from; + } + + /** + * + */ + String getRecipient() { + return this.recipient; + } + + /** + * + */ + MessageContext getMessageContext() { + return this.messageContext; + } + + /** + * This operation will simply attempt to put a poison message to the + * 'consumedMessageSizeQueue' to ensure that in the event this consumer is + * stopped before the message is consumed (see + * {@link #consumeUsing(Function)}), the server thread that is blocking in + * {@link #data(InputStream)} operation can unblock. + */ + // NOTE: the 'synchronize' is only here for API correctness, to ensure that + // stop() and consumeUsing(..) can never be invoked at the same time. + // However within NiFi it can never happen. + synchronized void stop() { + this.running.compareAndSet(true, false); + this.consumedMessageSizeQueue.offer(CONSUMER_STOPPED); + } + + /** + * This operation is invoked by the consumer. Implementation of this + * operation creates a synchronous connection with the message producer (see + * {@link #data(InputStream)}) via a pair of queues which guarantees that + * message is fully consumed and disposed by the consumer (provided as + * {@link Function}) before the server closes the data stream. + */ + // NOTE: the 'synchronize' is only here for API correctness, to ensure that + // stop() and consumeUsing(..) can never be invoked at the same time. + // However within NiFi it can never happen. + synchronized void consumeUsing(Function resultConsumer) { + int messageSize = 0; + try { + InputStream message = this.messageDataQueue.poll(1000, TimeUnit.MILLISECONDS); + if (message != null) { + messageSize = resultConsumer.apply(message); + } else { + messageSize = NO_MESSAGE; + } + } catch (InterruptedException e) { + this.logger.warn("Current thread is interrupted", e); + messageSize = INTERRUPTED; + Thread.currentThread().interrupt(); + } finally { + if (messageSize == 0) { + messageSize = ERROR; + } + if (messageSize != NO_MESSAGE) { + this.consumedMessageSizeQueue.offer(messageSize); + } + } + } + + /** + * This operation is invoked by the server thread and contains message data + * as {@link InputStream}. Implementation of this operation creates a + * synchronous connection with consumer (see {@link #onMessage(Function)}) + * via a pair of queues which guarantees that message is fully consumed and + * disposed by the consumer by the time this operation exits. + */ + @Override + public void data(InputStream data) throws RejectException, TooMuchDataException, IOException { + if (this.running.get()) { + try { + this.messageDataQueue.offer(data, Integer.MAX_VALUE, TimeUnit.SECONDS); + long messageSize = this.consumedMessageSizeQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); + String exceptionMessage = null; + if (messageSize == CONSUMER_STOPPED) { + exceptionMessage = "NIFI Consumer was stopped before message was successfully consumed"; + } else if (messageSize == INTERRUPTED) { + exceptionMessage = "Consuming thread was interrupted"; + } else if (messageSize == ERROR) { + exceptionMessage = "Consuming thread failed while processing 'data' SMTP event."; + } + if (exceptionMessage != null) { + throw new IllegalStateException(exceptionMessage); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Received message of size: " + messageSize); + } + } + } catch (InterruptedException e) { + this.logger.warn("Current thread is interrupted", e); + Thread.currentThread().interrupt(); + throw new IllegalStateException("Current thread is interrupted", e); + } + } else { + throw new IllegalStateException("NIFI Consumer was stopped before message was successfully consumed"); + } + } + + /** + * + */ + @Override + public void from(String from) throws RejectException { + this.from = from; + } + + /** + * + */ + @Override + public void recipient(String recipient) throws RejectException { + this.recipient = recipient; + } + + /** + * + */ + @Override + public void done() { + // noop + } + + /** + * + */ + @Override + public MessageHandler create(MessageContext ctx) { + this.messageContext = ctx; + return this; + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java deleted file mode 100644 index e1c36c53a3..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.nifi.processors.email.smtp.event; - - - -import java.io.InputStream; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A Smtp event which adds the transaction number and command to the StandardEvent. - */ - -public class SmtpEvent{ - private final String remoteIP; - private final String helo; - private final String from; - private final String to; - private final InputStream messageData; - private List> certificatesDetails; - private AtomicBoolean processed = new AtomicBoolean(false); - private AtomicBoolean acknowledged = new AtomicBoolean(false); - private AtomicInteger returnCode = new AtomicInteger(); - - public SmtpEvent( - final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates, - final InputStream messageData) { - - this.remoteIP = remoteIP; - this.helo = helo; - this.from = from; - this.to = to; - this.messageData = messageData; - - this.certificatesDetails = new ArrayList<>(); - - for (int c = 0; c < certificates.length; c++) { - X509Certificate cert = certificates[c]; - if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) { - Map certificate = new HashMap<>(); - - String certSerialNumber = cert.getSerialNumber().toString(); - String certSubjectDN = cert.getSubjectDN().getName(); - - - certificate.put("SerialNumber", certSerialNumber); - certificate.put("SubjectName", certSubjectDN); - - certificatesDetails.add(certificate); - - } - } - } - - public synchronized List> getCertifcateDetails() { - return certificatesDetails; - } - - public synchronized String getHelo() { - return helo; - } - - public synchronized InputStream getMessageData() { - return messageData; - } - - public synchronized String getFrom() { - return from; - } - - public synchronized String getTo() { - return to; - } - - public synchronized String getRemoteIP() { - return remoteIP; - } - - public synchronized void setProcessed() { - this.processed.set(true); - } - - public synchronized boolean getProcessed() { - return this.processed.get(); - } - - public synchronized void setAcknowledged() { - this.acknowledged.set(true); - } - - public synchronized boolean getAcknowledged() { - return this.acknowledged.get(); - } - - public synchronized void setReturnCode(int code) { - this.returnCode.set(code); - } - - public synchronized Integer getReturnCode() { - return this.returnCode.get(); - } - -} - diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java deleted file mode 100644 index 6b647bf872..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.email.smtp.handler; - -import java.io.IOException; -import java.io.InputStream; -import java.security.cert.X509Certificate; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.StopWatch; -import org.subethamail.smtp.DropConnectionException; -import org.subethamail.smtp.MessageContext; -import org.subethamail.smtp.MessageHandler; -import org.subethamail.smtp.MessageHandlerFactory; -import org.subethamail.smtp.RejectException; -import org.subethamail.smtp.TooMuchDataException; - -import org.apache.nifi.processors.email.smtp.event.SmtpEvent; - -public class SMTPMessageHandlerFactory implements MessageHandlerFactory { - final LinkedBlockingQueue incomingMessages; - final ComponentLog logger; - - - public SMTPMessageHandlerFactory(LinkedBlockingQueue incomingMessages, ComponentLog logger) { - this.incomingMessages = incomingMessages; - this.logger = logger; - - } - - @Override - public MessageHandler create(MessageContext messageContext) { - return new Handler(messageContext, incomingMessages, logger); - } - - class Handler implements MessageHandler { - final MessageContext messageContext; - String from; - String recipient; - - public Handler(MessageContext messageContext, LinkedBlockingQueue incomingMessages, ComponentLog logger){ - this.messageContext = messageContext; - } - - @Override - public void from(String from) throws RejectException { - // TODO: possibly whitelist senders? - this.from = from; - } - - @Override - public void recipient(String recipient) throws RejectException { - // TODO: possibly whitelist receivers? - this.recipient = recipient; - } - - @Override - public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException { - // Start counting the timer... - StopWatch watch = new StopWatch(true); - long elapsed; - final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS); - - X509Certificate[] certificates = new X509Certificate[]{}; - - final String remoteIP = messageContext.getRemoteAddress().toString(); - final String helo = messageContext.getHelo(); - - if (messageContext.getTlsPeerCertificates() != null ){ - certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone(); - } - - SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream); - - synchronized (message) { - // / Try to queue the message back to the NiFi session - try { - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - final SMTPResultCode returnCode = SMTPResultCode.fromCode(421); - logger.trace(returnCode.getLogMessage()); - - // NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of - // adding message to the processing queue. Yet, for the sake of consistency the message is - // updated nonetheless - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Once message has been sent to the queue, it should be processed by NiFi onTrigger, - // a flowfile created and its processed status updated before an acknowledgment is - // given back to the SMTP client - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - try { - message.wait(serverTimeout - elapsed); - } catch (InterruptedException e) { - // Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback - logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure"); - incomingMessages.remove(message); - - // Set the final values so onTrigger can figure out what happened to message - final SMTPResultCode returnCode = SMTPResultCode.fromCode(423); - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - - // Inform client - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Check if message is processed - if (!message.getProcessed()) { - incomingMessages.remove(message); - final SMTPResultCode returnCode = SMTPResultCode.fromCode(451); - logger.trace("Did not receive the onTrigger response within the acceptable timeframe."); - - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } else if(message.getReturnCode() != null) { - // No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger. - final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode()); - - if(returnCode.isError()){ - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - } else { - // onTrigger successfully processed the data. - // No need to check if over server timeout because we already processed the data. Might as well finalize it. - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(250); - message.setAcknowledged(); - } - // Exit, allowing Handler to acknowledge the message - message.notifyAll(); - } - } - - @Override - public void done() { - logger.trace("Called the last method of message handler. Exiting"); - // Notifying the ontrigger that the message was handled. - } - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java deleted file mode 100644 index b9c0d60e8e..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.email.smtp.handler; - -public enum SMTPResultCode { - // This error isn't raised by code. Being just a default value for the - // fromCode method below - UNKNOWN_ERROR_CODE(0, - "Unknown error.", - "Failed due to unknown error"), - - SUCCESS (250, - "Success delivering message", - "Message from {} to {} via {} acknowledgement complete"), - - QUEUE_ERROR (421, - "Could not queue the message. Try again", - "The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ), - - UNEXPECTED_ERROR(423, - "Unexpected Error. Please try again or contact the administrator in case it persists", - "Error hit during delivery of message from {}"), - - TIMEOUT_ERROR (451, - "The processing of your message timed-out, we may have received it but you better off sending it again", - "Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"), - - MESSAGE_TOO_LARGE(500, - "Message rejected due to length/size of data", - "Your message exceeds the maximum permitted size"); - - private static final SMTPResultCode[] codeArray = new SMTPResultCode[501]; - - static { - for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) { - codeArray[smtpResultCode.getCode()] = smtpResultCode; - } - } - - private final int code; - private final String errorMessage; - private final String logMessage; - - SMTPResultCode(int code, String errorMessage, String logMessage) { - this.code = code; - this.errorMessage = errorMessage; - this.logMessage = logMessage; - } - - public int getCode() { - return code; - } - - public String getErrorMessage() { - return errorMessage; - } - - public String getLogMessage() { - return logMessage; - } - - public static SMTPResultCode fromCode(int code) { - final SMTPResultCode smtpResultCode = codeArray[code]; - return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode; - } - - public boolean isError(){ - switch (this) { - case MESSAGE_TOO_LARGE: - case UNEXPECTED_ERROR: - case QUEUE_ERROR: - case TIMEOUT_ERROR: - return true; - default: - return false; - } - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java new file mode 100644 index 0000000000..4a2ad80fe6 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.mail.Email; +import org.apache.commons.mail.SimpleEmail; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.subethamail.smtp.server.SMTPServer; + +public class SmtpConsumerTest { + + private volatile ExecutorService executor; + + @Before + public void before() { + this.executor = Executors.newCachedThreadPool(); + } + + @After + public void after() { + this.executor.shutdown(); + } + + @Test + public void validateServerCanStopWhenConsumerStoppedBeforeConsumingMessage() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + CountDownLatch latch = new CountDownLatch(10); + AtomicReference exception = new AtomicReference<>(); + this.executor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 10; i++) { + try { + consumer.data(mock(InputStream.class)); + } catch (Exception e) { + e.printStackTrace(); + exception.set(e); + } finally { + latch.countDown(); + } + } + } + }); + boolean finished = latch.await(2000, TimeUnit.MILLISECONDS); + assertFalse(finished); + this.executor.shutdown(); + boolean terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertFalse(terminated); + + consumer.stop(); + finished = latch.await(1000, TimeUnit.MILLISECONDS); + assertTrue(finished); + terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertTrue(terminated); + } + + /* + * This test simply validates that consumeUsing(..) can react properly to + * thread interrupts. That said the condition is impossible in the current + * usage of SmtpConsumer + */ + @Test + public void validateServerCanStopWhenConsumerInterrupted() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + AtomicReference thread = new AtomicReference<>(); + + this.executor.execute(new Runnable() { + @Override + public void run() { + thread.set(Thread.currentThread()); + consumer.consumeUsing((in) -> { + return 0; + }); + } + }); + + this.executor.shutdown(); + boolean terminated = this.executor.awaitTermination(200, TimeUnit.MILLISECONDS); + assertFalse(terminated); // the call to consumeUsing(..) is blocking on + // the queue.poll since nothing is there + + thread.get().interrupt(); // interrupt thread that executes + // consumeUsing(..) + terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertTrue(terminated); + } + + /* + * Emulates any errors that may arise while reading the {@link InputStream} + * delivered as part of the data() call. + */ + @Test + public void validateServerCanStopWhenConsumerErrors() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exception = new AtomicReference<>(); + this.executor.execute(new Runnable() { + @Override + public void run() { + try { + consumer.data(mock(InputStream.class)); + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + } + }); + + this.executor.execute(new Runnable() { + @Override + public void run() { + consumer.consumeUsing((in) -> { + throw new RuntimeException("intentional"); + }); + } + }); + + // this to ensure that call to data unblocks + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(exception.get() instanceof IllegalStateException); + assertEquals("Consuming thread failed while processing 'data' SMTP event.", exception.get().getMessage()); + } + + @Test + public void validateServerCanStopWithUnconsumedMessage() throws Exception { + int port = NetworkUtils.availablePort(); + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtp = new SMTPServer(consumer); + smtp.setPort(port); + smtp.setSoftwareName("Apache NiFi"); + smtp.start(); + + BlockingQueue exceptionQueue = new ArrayBlockingQueue<>(1); + this.executor.execute(new Runnable() { + @Override + public void run() { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Hello SMTP"); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + exceptionQueue.offer(e); + } + } + }); + assertNull(exceptionQueue.poll()); + smtp.stop(); + Exception ex = exceptionQueue.poll(100, TimeUnit.MILLISECONDS); + assertNotNull(ex); + /* + * This essentially ensures and validates that if NiFi was not able to + * successfully consume message the aftermath of the exception thrown by + * the consumer is propagated to the sender essentially ensuring no data + * loss by allowing sender to resend + */ + assertTrue(ex.getMessage().startsWith("Sending the email to the following server failed")); + } + + @Test + public void validateConsumer() throws Exception { + int port = NetworkUtils.availablePort(); + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtp = new SMTPServer(consumer); + smtp.setPort(port); + smtp.setSoftwareName("Apache NiFi"); + smtp.start(); + + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + this.executor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } + } + }); + + List messages = new ArrayList<>(); + for (AtomicInteger i = new AtomicInteger(); i.get() < messageCount;) { + consumer.consumeUsing((dataInputStream) -> { + i.incrementAndGet(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int size = 0; + try { + size = IOUtils.copy(dataInputStream, bos); + messages.add(new String(bos.toByteArray(), StandardCharsets.UTF_8)); + return size; + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + return size; + }); + } + + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + assertTrue(complete); + assertTrue(messages.size() == messageCount); + assertTrue(messages.get(0).contains("MSG-0")); + assertTrue(messages.get(1).contains("MSG-1")); + assertTrue(messages.get(2).contains("MSG-2")); + assertTrue(messages.get(3).contains("MSG-3")); + assertTrue(messages.get(4).contains("MSG-4")); + smtp.stop(); + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java index 1fd7628732..c317b0c767 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -13,42 +13,118 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.nifi.processors.email; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.mail.Email; import org.apache.commons.mail.EmailException; import org.apache.commons.mail.SimpleEmail; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - -import org.apache.nifi.ssl.SSLContextService; - -import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - public class TestListenSMTP { - @Test(timeout=15000) - public void ValidEmailTls() throws Exception { - boolean[] failed = {false}; - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + private ScheduledExecutorService executor; - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + /** + * + */ + @Before + public void before() { + this.executor = Executors.newScheduledThreadPool(2); + } + + /** + * + */ + @After + public void after() { + this.executor.shutdown(); + } + + /** + * + */ + @Test + public void validateSuccessfulInteraction() throws Exception, EmailException { + int port = NetworkUtils.availablePort(); + + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + + runner.assertValid(); + + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); + + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + } + } + }, 1000, TimeUnit.MILLISECONDS); + + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 5); + } + + /** + * + */ + @Test + public void validateSuccessfulInteractionWithTls() throws Exception, EmailException { + System.setProperty("mail.smtp.ssl.trust", "*"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "localtest"); + int port = NetworkUtils.availablePort(); + + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); // Setup the SSL Context - final SSLContextService sslContextService = new StandardSSLContextService(); + SSLContextService sslContextService = new StandardSSLContextService(); runner.addControllerService("ssl-context", sslContextService); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); @@ -58,262 +134,107 @@ public class TestListenSMTP { runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.enableControllerService(sslContextService); + // and add the SSL context to the runner runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); + runner.assertValid(); + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { try { - - - System.setProperty("mail.smtp.ssl.trust", "*"); - System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks"); - System.setProperty("javax.net.ssl.keyStorePassword", "localtest"); - Email email = new SimpleEmail(); - - email.setHostName("127.0.0.1"); + email.setHostName("localhost"); email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); // Enable STARTTLS but ignore the cert email.setStartTLSEnabled(true); email.setStartTLSRequired(true); email.setSSLCheckServerIdentity(false); - - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - } catch (final Throwable t) { - failed[0] = true; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); } } - }); - clientThread.start(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); } + }, 1500, TimeUnit.MILLISECONDS); - // Checks if client experienced Exception - Assert.assertFalse("Client experienced exception", failed[0]); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); - clientThread.stop(); - - Assert.assertFalse("Sending email failed", failed[0]); - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 5); } - @Test(timeout=15000) - public void ValidEmail() throws Exception, EmailException { - final boolean[] failed = {false}; - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + /** + * + */ + @Test + public void validateTooLargeMessage() throws Exception, EmailException { + int port = NetworkUtils.availablePort(); - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B"); - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); + runner.assertValid(); - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); + int messageCount = 1; + CountDownLatch latch = new CountDownLatch(messageCount); - final int port = listenSmtp.getPort(); + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { try { Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); + email.setHostName("localhost"); email.setSmtpPort(port); - email.setStartTLSEnabled(false); email.setFrom("alice@nifi.apache.org"); email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); + email.setMsg("MSG-" + i); email.addTo("bob@nifi.apache.org"); email.send(); - - } catch (final EmailException t) { - failed[0] = true; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); } } - }); - clientThread.start(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); } - clientThread.stop(); + }, 1000, TimeUnit.MILLISECONDS); - Assert.assertFalse("Sending email failed", failed[0]); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } - } - - @Test(timeout=15000, expected=EmailException.class) - public void ValidEmailTimeOut() throws Exception { - - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); - - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); - runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "50 milliseconds"); - - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - - - Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); - email.setSmtpPort(port); - email.setStartTLSEnabled(false); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // force timeout - Thread.sleep(999L); - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); - } - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - - // shut down the server - listenSmtp.startShutdown(); - } - - @Test(timeout=15000) - public void emailTooLarge() throws Exception { - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); - - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "256B"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "2"); - runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); - - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - AtomicBoolean finished = new AtomicBoolean(false);; - AtomicBoolean failed = new AtomicBoolean(false); - - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); - email.setSmtpPort(port); - email.setStartTLSEnabled(false); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - - } catch (final EmailException t) { - failed.set(true); - } - finished.set(true); - } - }); - clientThread.start(); - - while (!finished.get()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); - Thread.sleep(10); - } - clientThread.stop(); - - Assert.assertTrue("Sending email succeeded when it should have failed", failed.get()); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0); - - runner.assertQueueEmpty(); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 0); } } \ No newline at end of file