diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java new file mode 100644 index 0000000000..b2deeccbfe --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * + */ +public class NetworkUtils { + + /** + * Will determine the available port + */ + public final static int availablePort() { + ServerSocket s = null; + try { + s = new ServerSocket(0); + s.setReuseAddress(true); + return s.getLocalPort(); + } catch (Exception e) { + throw new IllegalStateException("Failed to discover available port.", e); + } finally { + try { + s.close(); + } catch (IOException e) { + // ignore + } + } + } +} diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java index a657030e1a..b1ebf2fbcb 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.stream.io; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -24,16 +25,50 @@ public class LimitingInputStream extends InputStream { private final InputStream in; private final long limit; private long bytesRead = 0; + private final boolean exceptionOnLimit; + /** + * Constructs a limited input stream whereby if the limit is reached all + * subsequent calls to read will return a -1. + * + * @param in + * the underlying input stream + * @param limit + * maximum length of bytes to read from underlying input stream + */ public LimitingInputStream(final InputStream in, final long limit) { + this(in, limit, false); + } + + /** + * Constructs a limited input stream whereby if the limit is reached all + * subsequent calls to read will return a -1 or EOFexception as configured + * + * @param in + * the underlying input stream + * @param limit + * maximum length of bytes to read from underlying input stream + * @param eofOnLimit + * true if EOF should occur on all read calls once limit reached; + * false if -1 should be returned instead + */ + public LimitingInputStream(final InputStream in, final long limit, final boolean eofOnLimit) { this.in = in; this.limit = limit; + exceptionOnLimit = eofOnLimit; + } + + private int limitReached() throws IOException { + if (exceptionOnLimit) { + throw new EOFException("Limit of allowed bytes read from input stream reached"); + } + return -1; } @Override public int read() throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int val = in.read(); @@ -46,7 +81,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(final byte[] b) throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int maxToRead = (int) Math.min(b.length, limit - bytesRead); @@ -61,7 +96,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(byte[] b, int off, int len) throws IOException { if (bytesRead >= limit) { - return -1; + return limitReached(); } final int maxToRead = (int) Math.min(len, limit - bytesRead); diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java new file mode 100644 index 0000000000..5bb4362e1a --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.EOFException; +import java.io.IOException; + +import junit.framework.TestCase; + +public class LimitingInputStreamTest extends TestCase { + private final static byte[] TEST_BUFFER = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + + public void testReadLimitNotReached() throws IOException { + LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, false); + long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, TEST_BUFFER.length); + + is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, true); + bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, TEST_BUFFER.length); + } + + public void testReadLimitExceeded() throws IOException { + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9); + final long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + assertEquals(bytesRead, 9); + } + + public void testReadLimitExceededEof() throws IOException { + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9, true); + try { + StreamUtils.copy(is, new ByteArrayOutputStream()); + fail("Should not get here"); + } catch (final EOFException eof) { + } + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..dc462ceb12 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,37 @@ +nifi-email-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Spring Framework + The following NOTICE information applies: + Spring Framework + Copyright 2002-2016 + + (ASLv2) SubEthaSMTP - A SMTP mail server + The following NOTICE information applies: + Spring Framework + Copyright 2006-2007 + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail) + +************************ +Common Development and Distribution License 1.0 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp) diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index 75cd3d2119..34125f4263 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -53,6 +53,12 @@ org.springframework.integration spring-integration-mail 4.3.0.RELEASE + + + org.springframework.retry + spring-retry + + commons-logging diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java index 0f17838837..52d448168b 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -13,16 +13,15 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.nifi.processors.email; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; import java.io.IOException; -import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,71 +30,57 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; - -import org.subethamail.smtp.server.SMTPServer; - - -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.DataUnit; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; - -import org.apache.nifi.processors.email.smtp.event.SmtpEvent; -import org.apache.nifi.processors.email.smtp.handler.SMTPResultCode; -import org.apache.nifi.processors.email.smtp.handler.SMTPMessageHandlerFactory; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.springframework.util.StringUtils; +import org.subethamail.smtp.server.SMTPServer; @Tags({"listen", "email", "smtp"}) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + - "allowing nifi to listen for incoming email. " + - "" + - "Note this server does not perform any email validation. If direct exposure to the internet is sought," + - "it may be a better idea to use the combination of NiFi and an industrial scale MTA (e.g. Postfix)") +@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + + "allowing nifi to listen for incoming email. Note this server does not perform any email " + + "validation. If direct exposure to the internet is sought, it may be a better idea to use " + + "the combination of NiFi and an industrial scale MTA (e.g. Postfix)") @WritesAttributes({ - @WritesAttribute(attribute = "mime.type", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + - "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + - "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.to", description = "The value used during RCPT TO (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection")}) - -public class ListenSMTP extends AbstractProcessor { - public static final String SMTP_HELO = "smtp.helo"; - public static final String SMTP_FROM = "smtp.from"; - public static final String SMTP_TO = "smtp.to"; - public static final String MIME_TYPE = "message/rfc822"; - public static final String SMTP_SRC_IP = "smtp.src"; - - - protected static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() + @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), + @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection"), + @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), + @WritesAttribute(attribute = "smtp.recipient", description = "The value used during RCPT TO (i.e. envelope)"), + @WritesAttribute(attribute = "mime.type", description = "Mime type of the message")}) +public class ListenSMTP extends AbstractSessionFactoryProcessor { + static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() .name("SMTP_PORT") .displayName("Listening Port") .description("The TCP port the ListenSMTP processor will bind to." + @@ -106,26 +91,17 @@ public class ListenSMTP extends AbstractProcessor { .addValidator(StandardValidators.PORT_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() - .name("SMTP_HOSTNAME") - .displayName("SMTP hostname") - .description("The hostname to be embedded into the banner displayed when an " + - "SMTP client connects to the processor TCP port .") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() .name("SMTP_MAXIMUM_CONNECTIONS") .displayName("Maximum number of SMTP connection") .description("The maximum number of simultaneous SMTP connections.") .required(true) + .defaultValue("1") .expressionLanguageSupported(false) .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() .name("SMTP_TIMEOUT") .displayName("SMTP connection timeout") .description("The maximum time to wait for an action of SMTP client.") @@ -135,29 +111,17 @@ public class ListenSMTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - protected static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() + static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() .name("SMTP_MAXIMUM_MSG_SIZE") .displayName("SMTP Maximum Message Size") .description("The maximum number of bytes the server will accept.") .required(true) - .defaultValue("20MB") + .defaultValue("20 MB") .expressionLanguageSupported(false) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Integer.MAX_VALUE)) .build(); - protected static final PropertyDescriptor SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE = new PropertyDescriptor.Builder() - .name("SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE") - .displayName("SMTP message buffer length") - .description("This property control the size of the Queue utilised by the processor to hold messages as they are processed. " + - "Setting a very small value will decrease the number of emails the processor simultaneously, while setting an very large" + - "queue will result in higher memory and CPU utilisation. The default setting of 1024 is generally a fair number.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .defaultValue("1024") - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL_CONTEXT_SERVICE") .displayName("SSL Context Service") .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + @@ -166,7 +130,7 @@ public class ListenSMTP extends AbstractProcessor { .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("CLIENT_AUTH") .displayName("Client Auth") .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") @@ -174,309 +138,243 @@ public class ListenSMTP extends AbstractProcessor { .allowableValues(SSLContextService.ClientAuth.NONE.toString(), SSLContextService.ClientAuth.REQUIRED.toString()) .build(); - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - - final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); - final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - if (sslContextService != null && StringUtils.isBlank(clientAuth)) { - results.add(new ValidationResult.Builder() - .explanation("Client Auth must be provided when using TLS/SSL") - .valid(false).subject("Client Auth").build()); - } - - return results; - - } - - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Extraction was successful") + protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() + .name("SMTP_HOSTNAME") + .displayName("SMTP hostname") + .description("The hostname to be embedded into the banner displayed when an " + + "SMTP client connects to the processor TCP port .") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - private Set relationships; - private List propertyDescriptors; - private volatile LinkedBlockingQueue incomingMessages; + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All new messages will be routed as FlowFiles to this relationship") + .build(); - private volatile SMTPServer server; - private AtomicBoolean initialized = new AtomicBoolean(false); - private AtomicBoolean stopping = new AtomicBoolean(false); + private final static List propertyDescriptors; + private final static Set relationships; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SMTP_PORT); + _propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS); + _propertyDescriptors.add(SMTP_TIMEOUT); + _propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + _propertyDescriptors.add(SMTP_HOSTNAME); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + private volatile SMTPServer smtp; + + private volatile SmtpConsumer smtpConsumer; + + private volatile int maxMessageSize; + + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession processSession = sessionFactory.createSession(); + if (this.smtp == null) { + this.setupSmtpIfNecessary(context, processSession); + } + + /* + * Will consume incoming message directly from the wire and into + * FlowFile/Content repository before exiting. This essentially limits + * any potential data loss by allowing SMTPServer thread to actually + * commit NiFi session if all good. However in the event of exception, + * such exception will be propagated back to the email sender via + * "undeliverable message" allowing such user to re-send the message + */ + this.smtpConsumer.consumeUsing((inputDataStream) -> { + FlowFile flowFile = processSession.create(); + AtomicInteger size = new AtomicInteger(); + try { + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + size.set(IOUtils.copy(new LimitingInputStream(inputDataStream, ListenSMTP.this.maxMessageSize, true), out)); + } + }); + flowFile = updateFlowFileWithAttributes(flowFile, processSession); + + processSession.getProvenanceReporter().receive(flowFile, + "smtp://" + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + return size.get(); + } catch (Exception e) { + context.yield(); + this.getLogger().error("Failed while processing incoming mail. " + e.getMessage(), e); + throw new IllegalStateException("Failed while processing incoming mail. " + e.getMessage(), e); + } + }); + } + + /** + * + */ + @OnStopped + public void stop() { + this.getLogger().info("Stopping SMTPServer"); + this.smtp.stop(); + this.smtp = null; + this.getLogger().info("SMTPServer stopped"); + } + + /** + * + */ @Override public Set getRelationships() { return relationships; } + /** + * + */ + @Override + protected Collection customValidate(ValidationContext validationContext) { + List results = new ArrayList<>(); + + String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); + SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (sslContextService != null && !StringUtils.hasText(clientAuth)) { + results.add(new ValidationResult.Builder() + .subject(CLIENT_AUTH.getDisplayName()) + .explanation(CLIENT_AUTH.getDisplayName() + " must be provided when using " + SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .build()); + } else if (sslContextService == null && StringUtils.hasText(clientAuth)) { + results.add(new ValidationResult.Builder() + .subject(SSL_CONTEXT_SERVICE.getDisplayName()) + .explanation(SSL_CONTEXT_SERVICE.getDisplayName() + " must be provided when selecting " + CLIENT_AUTH.getDisplayName()) + .valid(false) + .build()); + } + return results; + } + + /** + * + */ @Override protected List getSupportedPropertyDescriptors() { return propertyDescriptors; } - @Override - protected void init(final ProcessorInitializationContext context) { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - - final List props = new ArrayList<>(); - props.add(SMTP_PORT); - props.add(SMTP_HOSTNAME); - props.add(SMTP_MAXIMUM_CONNECTIONS); - props.add(SMTP_TIMEOUT); - props.add(SMTP_MAXIMUM_MSG_SIZE); - props.add(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE); - props.add(SSL_CONTEXT_SERVICE); - props.add(CLIENT_AUTH); - this.propertyDescriptors = Collections.unmodifiableList(props); - - } - - // Upon Schedule, reset the initialized state to false - @OnScheduled - public void onScheduled(ProcessContext context) { - initialized.set(false); - stopping.set(false); - } - - protected synchronized void initializeSMTPServer(final ProcessContext context) throws Exception { - - // check if we are already running or if it is stopping - if (initialized.get() && server.isRunning() || stopping.get() ) { - return; + /** + * + */ + private FlowFile updateFlowFileWithAttributes(FlowFile flowFile, ProcessSession processSession) { + Map attributes = new HashMap<>(); + Certificate[] tlsPeerCertificates = this.smtpConsumer.getMessageContext().getTlsPeerCertificates(); + if (tlsPeerCertificates != null) { + for (int i = 0; i < tlsPeerCertificates.length; i++) { + if (tlsPeerCertificates[i] instanceof X509Certificate) { + X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i]; + attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); + attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName()); + } + } } - incomingMessages = new LinkedBlockingQueue<>(context.getProperty(SMTP_MAXIMUM_INCOMING_MESSAGE_QUEUE).asInteger()); + attributes.put("smtp.helo", this.smtpConsumer.getMessageContext().getHelo()); + attributes.put("smtp.remote.addr", this.smtpConsumer.getMessageContext().getRemoteAddress().toString()); + attributes.put("smtp.from", this.smtpConsumer.getFrom()); + attributes.put("smtp.recepient", this.smtpConsumer.getRecipient()); + attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); + return processSession.putAllAttributes(flowFile, attributes); + } - String clientAuth = null; + /** + * + */ + private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.smtp == null) { + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtpServer = this.createServerInstance(context, consumer); + smtpServer.setSoftwareName("Apache NiFi"); + smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); + smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); + this.maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); + smtpServer.setMaxMessageSize(this.maxMessageSize); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + if (context.getProperty(SMTP_HOSTNAME).isSet()) { + smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); + } - // If an SSLContextService was provided then create an SSLContext to pass down to the server - SSLContext sslContext = null; - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + this.smtpConsumer = consumer; + this.smtp = smtpServer; + this.smtp.start(); } + } - final SSLContext finalSslContext = sslContext; - - SMTPMessageHandlerFactory smtpMessageHandlerFactory = new SMTPMessageHandlerFactory(incomingMessages, getLogger()); - final SMTPServer server = new SMTPServer(smtpMessageHandlerFactory) { - + /** + * + */ + private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { + SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + SMTPServer smtpServer = sslContextService == null ? new ConsumerAwareSmtpServer(consumer) : new ConsumerAwareSmtpServer(consumer) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); + SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); + SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + sslSocket.setUseClientMode(false); - SSLSocketFactory socketFactory = finalSslContext.getSocketFactory(); - - SSLSocket s = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); - - s.setUseClientMode(false); - - - // For some reason the createSSLContext above is not enough to enforce - // client side auth - // If client auth is required... - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - s.setNeedClientAuth(true); + if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { + this.setRequireTLS(true); + sslSocket.setNeedClientAuth(true); } - - - return s; + return sslSocket; } }; - - // Set some parameters to our server - server.setSoftwareName("Apache NiFi"); - - - // Set the Server options based on properties - server.setPort(context.getProperty(SMTP_PORT).asInteger()); - server.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - server.setMaxMessageSize(context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue()); - server.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - server.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - - // Check if TLS should be enabled if (sslContextService != null) { - server.setEnableTLS(true); + smtpServer.setEnableTLS(true); } else { - server.setHideTLS(true); + smtpServer.setHideTLS(true); } - - // Set TLS to required in case CLIENT_AUTH = required - if (SSLContextService.ClientAuth.REQUIRED.toString().equals(context.getProperty(CLIENT_AUTH).getValue())) { - server.setRequireTLS(true); - } - - this.server = server; - server.start(); - - getLogger().info("Server started and listening on port " + server.getPort()); - - initialized.set(true); - stopping.set(false); + return smtpServer; } - @OnUnscheduled - public void startShutdown() throws Exception { - if (server != null) { - stopping.set(true); - getLogger().info("Shutting down processor P{}", new Object[]{server}); - server.stop(); - getLogger().info("Shut down {}", new Object[]{server}); - } - } + /** + * Wrapper over {@link SMTPServer} that is aware of the {@link SmtpConsumer} + * to ensure that its stop() operation is called during server stoppage. + */ + private static class ConsumerAwareSmtpServer extends SMTPServer { - @OnStopped - public void completeShutdown() throws Exception { - if (server != null) { - if (!server.isRunning() && stopping.get() ) { - stopping.set(false); - } - getLogger().info("Completed shut down {}", new Object[]{server}); - } - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - - try { - initializeSMTPServer(context); - } catch (Exception e) { - context.yield(); - throw new ProcessException("Failed to initialize the SMTP server", e); + /** + * + */ + public ConsumerAwareSmtpServer(SmtpConsumer consumer) { + super(consumer); } - while (!incomingMessages.isEmpty()) { - SmtpEvent message = incomingMessages.poll(); - - if (message == null) { - return; - } - - synchronized (message) { - if (resultCodeSetAndIsError(message)) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - getLogger().warn("Message failed before onTrigger processing message was: " + resultCode.getLogMessage()); - continue; - } - - try { - FlowFile flowfile = session.create(); - - if (message.getMessageData() != null) { - flowfile = session.write(flowfile, out -> { - InputStream inputStream = message.getMessageData(); - byte [] buffer = new byte[1024]; - - int rd; - long totalBytesRead =0; - - while ((rd = inputStream.read(buffer, 0, buffer.length)) != -1 ) { - totalBytesRead += rd; - if (totalBytesRead > server.getMaxMessageSize() ) { - message.setReturnCode(500); - message.setProcessed(); - break; - } - out.write(buffer, 0, rd); - } - out.flush(); - }); - } else { - getLogger().debug("Message body was null"); - message.setReturnCode(SMTPResultCode.UNKNOWN_ERROR_CODE.getCode()); - message.setProcessed(); - } - - if (!message.getProcessed()) { - HashMap attributes = new HashMap<>(); - // Gather message attributes - attributes.put(SMTP_HELO, message.getHelo()); - attributes.put(SMTP_SRC_IP, message.getHelo()); - attributes.put(SMTP_FROM, message.getFrom()); - attributes.put(SMTP_TO, message.getTo()); - - List> details = message.getCertifcateDetails(); - int c = 0; - - // Add a selection of each X509 certificates to the already gathered attributes - - for (Map detail : details) { - attributes.put("smtp.certificate." + c + ".serial", detail.getOrDefault("SerialNumber", null)); - attributes.put("smtp.certificate." + c + ".subjectName", detail.getOrDefault("SubjectName", null)); - c++; - } - - // Set Mime-Type - attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE); - - // Add the attributes. to flowfile - flowfile = session.putAllAttributes(flowfile, attributes); - session.getProvenanceReporter().receive(flowfile, "smtp://" + SMTP_HOSTNAME + ":" + SMTP_PORT + "/"); - session.transfer(flowfile, REL_SUCCESS); - - getLogger().info("Transferring {} to success", new Object[]{flowfile}); - } - } catch (Exception e) { - message.setProcessed(); - message.setReturnCode(SMTPResultCode.UNEXPECTED_ERROR.getCode()); - } - - // Check to see if it failed when creating the FlowFile - if (resultCodeSetAndIsError(message)) { - session.rollback(); - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); - message.notifyAll(); - continue; - } - - // Finished processing, - message.setProcessed(); - - // notify on the message so data() can process the rest of the method - message.notifyAll(); - - // Wait for data() to tell sender we received the message and double check we didn't timeout - final long serverTimeout = context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - try { - message.wait(serverTimeout); - } catch (InterruptedException e) { - getLogger().info("Interrupted while waiting for Message Handler to acknowledge message."); - } - - // Check to see if the sender was correctly notified - if (resultCodeSetAndIsError(message)) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - session.rollback(); - getLogger().warn("Failed to received message due to: " + resultCode.getLogMessage()); - } else { - // Need to commit because if we didn't and a following message needed to be rolled back, this message would be too, causing data loss. - session.commit(); - } + /** + * + */ + @Override + public synchronized void stop() { + try { + SmtpConsumer consumer = (SmtpConsumer) this.getMessageHandlerFactory(); + consumer.stop(); + } finally { + super.stop(); } } } - - private boolean resultCodeSetAndIsError(SmtpEvent message){ - if (message.getReturnCode() != null ) { - SMTPResultCode resultCode = SMTPResultCode.fromCode(message.getReturnCode()); - if (resultCode.isError()) { - return true; - } - } - return false; - } - - // Same old... same old... used for testing to access the random port that was selected - protected int getPort() { - return server == null ? 0 : server.getPort(); - } - - } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java new file mode 100644 index 0000000000..2063c0da45 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandler; +import org.subethamail.smtp.MessageHandlerFactory; +import org.subethamail.smtp.RejectException; +import org.subethamail.smtp.TooMuchDataException; +import org.subethamail.smtp.server.SMTPServer; + +/** + * A simple consumer that provides a bridge between 'push' message distribution + * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. It + * implements both {@link MessageHandler} and {@link MessageHandlerFactory} + * allowing it to interact directly with {@link SMTPServer}. + */ +class SmtpConsumer implements MessageHandler, MessageHandlerFactory { + + private final static int CONSUMER_STOPPED = -1; + + private final static int INTERRUPTED = -2; + + private final static int ERROR = -9; + + private final static int NO_MESSAGE = -8; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final BlockingQueue messageDataQueue = new ArrayBlockingQueue<>(1); + + private final BlockingQueue consumedMessageSizeQueue = new ArrayBlockingQueue<>(1); + + private final AtomicBoolean running = new AtomicBoolean(true); + + private volatile MessageContext messageContext; + + private volatile String from; + + private volatile String recipient; + + + /** + * + */ + String getFrom() { + return this.from; + } + + /** + * + */ + String getRecipient() { + return this.recipient; + } + + /** + * + */ + MessageContext getMessageContext() { + return this.messageContext; + } + + /** + * This operation will simply attempt to put a poison message to the + * 'consumedMessageSizeQueue' to ensure that in the event this consumer is + * stopped before the message is consumed (see + * {@link #consumeUsing(Function)}), the server thread that is blocking in + * {@link #data(InputStream)} operation can unblock. + */ + // NOTE: the 'synchronize' is only here for API correctness, to ensure that + // stop() and consumeUsing(..) can never be invoked at the same time. + // However within NiFi it can never happen. + synchronized void stop() { + this.running.compareAndSet(true, false); + this.consumedMessageSizeQueue.offer(CONSUMER_STOPPED); + } + + /** + * This operation is invoked by the consumer. Implementation of this + * operation creates a synchronous connection with the message producer (see + * {@link #data(InputStream)}) via a pair of queues which guarantees that + * message is fully consumed and disposed by the consumer (provided as + * {@link Function}) before the server closes the data stream. + */ + // NOTE: the 'synchronize' is only here for API correctness, to ensure that + // stop() and consumeUsing(..) can never be invoked at the same time. + // However within NiFi it can never happen. + synchronized void consumeUsing(Function resultConsumer) { + int messageSize = 0; + try { + InputStream message = this.messageDataQueue.poll(1000, TimeUnit.MILLISECONDS); + if (message != null) { + messageSize = resultConsumer.apply(message); + } else { + messageSize = NO_MESSAGE; + } + } catch (InterruptedException e) { + this.logger.warn("Current thread is interrupted", e); + messageSize = INTERRUPTED; + Thread.currentThread().interrupt(); + } finally { + if (messageSize == 0) { + messageSize = ERROR; + } + if (messageSize != NO_MESSAGE) { + this.consumedMessageSizeQueue.offer(messageSize); + } + } + } + + /** + * This operation is invoked by the server thread and contains message data + * as {@link InputStream}. Implementation of this operation creates a + * synchronous connection with consumer (see {@link #onMessage(Function)}) + * via a pair of queues which guarantees that message is fully consumed and + * disposed by the consumer by the time this operation exits. + */ + @Override + public void data(InputStream data) throws RejectException, TooMuchDataException, IOException { + if (this.running.get()) { + try { + this.messageDataQueue.offer(data, Integer.MAX_VALUE, TimeUnit.SECONDS); + long messageSize = this.consumedMessageSizeQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); + String exceptionMessage = null; + if (messageSize == CONSUMER_STOPPED) { + exceptionMessage = "NIFI Consumer was stopped before message was successfully consumed"; + } else if (messageSize == INTERRUPTED) { + exceptionMessage = "Consuming thread was interrupted"; + } else if (messageSize == ERROR) { + exceptionMessage = "Consuming thread failed while processing 'data' SMTP event."; + } + if (exceptionMessage != null) { + throw new IllegalStateException(exceptionMessage); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Received message of size: " + messageSize); + } + } + } catch (InterruptedException e) { + this.logger.warn("Current thread is interrupted", e); + Thread.currentThread().interrupt(); + throw new IllegalStateException("Current thread is interrupted", e); + } + } else { + throw new IllegalStateException("NIFI Consumer was stopped before message was successfully consumed"); + } + } + + /** + * + */ + @Override + public void from(String from) throws RejectException { + this.from = from; + } + + /** + * + */ + @Override + public void recipient(String recipient) throws RejectException { + this.recipient = recipient; + } + + /** + * + */ + @Override + public void done() { + // noop + } + + /** + * + */ + @Override + public MessageHandler create(MessageContext ctx) { + this.messageContext = ctx; + return this; + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java deleted file mode 100644 index e1c36c53a3..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/event/SmtpEvent.java +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.nifi.processors.email.smtp.event; - - - -import java.io.InputStream; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A Smtp event which adds the transaction number and command to the StandardEvent. - */ - -public class SmtpEvent{ - private final String remoteIP; - private final String helo; - private final String from; - private final String to; - private final InputStream messageData; - private List> certificatesDetails; - private AtomicBoolean processed = new AtomicBoolean(false); - private AtomicBoolean acknowledged = new AtomicBoolean(false); - private AtomicInteger returnCode = new AtomicInteger(); - - public SmtpEvent( - final String remoteIP, final String helo, final String from, final String to, final X509Certificate[] certificates, - final InputStream messageData) { - - this.remoteIP = remoteIP; - this.helo = helo; - this.from = from; - this.to = to; - this.messageData = messageData; - - this.certificatesDetails = new ArrayList<>(); - - for (int c = 0; c < certificates.length; c++) { - X509Certificate cert = certificates[c]; - if (cert.getSerialNumber() != null && cert.getSubjectDN() != null) { - Map certificate = new HashMap<>(); - - String certSerialNumber = cert.getSerialNumber().toString(); - String certSubjectDN = cert.getSubjectDN().getName(); - - - certificate.put("SerialNumber", certSerialNumber); - certificate.put("SubjectName", certSubjectDN); - - certificatesDetails.add(certificate); - - } - } - } - - public synchronized List> getCertifcateDetails() { - return certificatesDetails; - } - - public synchronized String getHelo() { - return helo; - } - - public synchronized InputStream getMessageData() { - return messageData; - } - - public synchronized String getFrom() { - return from; - } - - public synchronized String getTo() { - return to; - } - - public synchronized String getRemoteIP() { - return remoteIP; - } - - public synchronized void setProcessed() { - this.processed.set(true); - } - - public synchronized boolean getProcessed() { - return this.processed.get(); - } - - public synchronized void setAcknowledged() { - this.acknowledged.set(true); - } - - public synchronized boolean getAcknowledged() { - return this.acknowledged.get(); - } - - public synchronized void setReturnCode(int code) { - this.returnCode.set(code); - } - - public synchronized Integer getReturnCode() { - return this.returnCode.get(); - } - -} - diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java deleted file mode 100644 index 6b647bf872..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPMessageHandlerFactory.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.email.smtp.handler; - -import java.io.IOException; -import java.io.InputStream; -import java.security.cert.X509Certificate; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.StopWatch; -import org.subethamail.smtp.DropConnectionException; -import org.subethamail.smtp.MessageContext; -import org.subethamail.smtp.MessageHandler; -import org.subethamail.smtp.MessageHandlerFactory; -import org.subethamail.smtp.RejectException; -import org.subethamail.smtp.TooMuchDataException; - -import org.apache.nifi.processors.email.smtp.event.SmtpEvent; - -public class SMTPMessageHandlerFactory implements MessageHandlerFactory { - final LinkedBlockingQueue incomingMessages; - final ComponentLog logger; - - - public SMTPMessageHandlerFactory(LinkedBlockingQueue incomingMessages, ComponentLog logger) { - this.incomingMessages = incomingMessages; - this.logger = logger; - - } - - @Override - public MessageHandler create(MessageContext messageContext) { - return new Handler(messageContext, incomingMessages, logger); - } - - class Handler implements MessageHandler { - final MessageContext messageContext; - String from; - String recipient; - - public Handler(MessageContext messageContext, LinkedBlockingQueue incomingMessages, ComponentLog logger){ - this.messageContext = messageContext; - } - - @Override - public void from(String from) throws RejectException { - // TODO: possibly whitelist senders? - this.from = from; - } - - @Override - public void recipient(String recipient) throws RejectException { - // TODO: possibly whitelist receivers? - this.recipient = recipient; - } - - @Override - public void data(InputStream inputStream) throws RejectException, TooMuchDataException, IOException { - // Start counting the timer... - StopWatch watch = new StopWatch(true); - long elapsed; - final long serverTimeout = TimeUnit.MILLISECONDS.convert(messageContext.getSMTPServer().getConnectionTimeout(), TimeUnit.MILLISECONDS); - - X509Certificate[] certificates = new X509Certificate[]{}; - - final String remoteIP = messageContext.getRemoteAddress().toString(); - final String helo = messageContext.getHelo(); - - if (messageContext.getTlsPeerCertificates() != null ){ - certificates = (X509Certificate[]) messageContext.getTlsPeerCertificates().clone(); - } - - SmtpEvent message = new SmtpEvent(remoteIP, helo, from, recipient, certificates, inputStream); - - synchronized (message) { - // / Try to queue the message back to the NiFi session - try { - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - incomingMessages.offer(message, serverTimeout - elapsed, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - final SMTPResultCode returnCode = SMTPResultCode.fromCode(421); - logger.trace(returnCode.getLogMessage()); - - // NOTE: Setting acknowledged at this stage is redundant as this catch deals with the inability of - // adding message to the processing queue. Yet, for the sake of consistency the message is - // updated nonetheless - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Once message has been sent to the queue, it should be processed by NiFi onTrigger, - // a flowfile created and its processed status updated before an acknowledgment is - // given back to the SMTP client - elapsed = watch.getElapsed(TimeUnit.MILLISECONDS); - try { - message.wait(serverTimeout - elapsed); - } catch (InterruptedException e) { - // Interrupted while waiting for the message to process. Will return error and request onTrigger to rollback - logger.trace("Interrupted while waiting for processor to process data. Returned error to SMTP client as precautionary measure"); - incomingMessages.remove(message); - - // Set the final values so onTrigger can figure out what happened to message - final SMTPResultCode returnCode = SMTPResultCode.fromCode(423); - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - - // Inform client - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - - // Check if message is processed - if (!message.getProcessed()) { - incomingMessages.remove(message); - final SMTPResultCode returnCode = SMTPResultCode.fromCode(451); - logger.trace("Did not receive the onTrigger response within the acceptable timeframe."); - - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(returnCode.getCode()); - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } else if(message.getReturnCode() != null) { - // No need to check if over server timeout because we already processed the data. Might as well use the status code returned by onTrigger. - final SMTPResultCode returnCode = SMTPResultCode.fromCode(message.getReturnCode()); - - if(returnCode.isError()){ - message.setAcknowledged(); - throw new DropConnectionException(returnCode.getCode(), returnCode.getErrorMessage()); - } - } else { - // onTrigger successfully processed the data. - // No need to check if over server timeout because we already processed the data. Might as well finalize it. - // Set the final values so onTrigger can figure out what happened to message - message.setReturnCode(250); - message.setAcknowledged(); - } - // Exit, allowing Handler to acknowledge the message - message.notifyAll(); - } - } - - @Override - public void done() { - logger.trace("Called the last method of message handler. Exiting"); - // Notifying the ontrigger that the message was handled. - } - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java deleted file mode 100644 index b9c0d60e8e..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/handler/SMTPResultCode.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.email.smtp.handler; - -public enum SMTPResultCode { - // This error isn't raised by code. Being just a default value for the - // fromCode method below - UNKNOWN_ERROR_CODE(0, - "Unknown error.", - "Failed due to unknown error"), - - SUCCESS (250, - "Success delivering message", - "Message from {} to {} via {} acknowledgement complete"), - - QUEUE_ERROR (421, - "Could not queue the message. Try again", - "The SMTP processor has just dropped a message due to the queue being too full, considering increasing the queue size" ), - - UNEXPECTED_ERROR(423, - "Unexpected Error. Please try again or contact the administrator in case it persists", - "Error hit during delivery of message from {}"), - - TIMEOUT_ERROR (451, - "The processing of your message timed-out, we may have received it but you better off sending it again", - "Message from {} to {} via {} acknowledgement timeout despite processing completed. Data duplication may occur"), - - MESSAGE_TOO_LARGE(500, - "Message rejected due to length/size of data", - "Your message exceeds the maximum permitted size"); - - private static final SMTPResultCode[] codeArray = new SMTPResultCode[501]; - - static { - for (final SMTPResultCode smtpResultCode : SMTPResultCode.values()) { - codeArray[smtpResultCode.getCode()] = smtpResultCode; - } - } - - private final int code; - private final String errorMessage; - private final String logMessage; - - SMTPResultCode(int code, String errorMessage, String logMessage) { - this.code = code; - this.errorMessage = errorMessage; - this.logMessage = logMessage; - } - - public int getCode() { - return code; - } - - public String getErrorMessage() { - return errorMessage; - } - - public String getLogMessage() { - return logMessage; - } - - public static SMTPResultCode fromCode(int code) { - final SMTPResultCode smtpResultCode = codeArray[code]; - return (smtpResultCode == null) ? UNKNOWN_ERROR_CODE : smtpResultCode; - } - - public boolean isError(){ - switch (this) { - case MESSAGE_TOO_LARGE: - case UNEXPECTED_ERROR: - case QUEUE_ERROR: - case TIMEOUT_ERROR: - return true; - default: - return false; - } - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java new file mode 100644 index 0000000000..4a2ad80fe6 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.mail.Email; +import org.apache.commons.mail.SimpleEmail; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.subethamail.smtp.server.SMTPServer; + +public class SmtpConsumerTest { + + private volatile ExecutorService executor; + + @Before + public void before() { + this.executor = Executors.newCachedThreadPool(); + } + + @After + public void after() { + this.executor.shutdown(); + } + + @Test + public void validateServerCanStopWhenConsumerStoppedBeforeConsumingMessage() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + CountDownLatch latch = new CountDownLatch(10); + AtomicReference exception = new AtomicReference<>(); + this.executor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 10; i++) { + try { + consumer.data(mock(InputStream.class)); + } catch (Exception e) { + e.printStackTrace(); + exception.set(e); + } finally { + latch.countDown(); + } + } + } + }); + boolean finished = latch.await(2000, TimeUnit.MILLISECONDS); + assertFalse(finished); + this.executor.shutdown(); + boolean terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertFalse(terminated); + + consumer.stop(); + finished = latch.await(1000, TimeUnit.MILLISECONDS); + assertTrue(finished); + terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertTrue(terminated); + } + + /* + * This test simply validates that consumeUsing(..) can react properly to + * thread interrupts. That said the condition is impossible in the current + * usage of SmtpConsumer + */ + @Test + public void validateServerCanStopWhenConsumerInterrupted() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + AtomicReference thread = new AtomicReference<>(); + + this.executor.execute(new Runnable() { + @Override + public void run() { + thread.set(Thread.currentThread()); + consumer.consumeUsing((in) -> { + return 0; + }); + } + }); + + this.executor.shutdown(); + boolean terminated = this.executor.awaitTermination(200, TimeUnit.MILLISECONDS); + assertFalse(terminated); // the call to consumeUsing(..) is blocking on + // the queue.poll since nothing is there + + thread.get().interrupt(); // interrupt thread that executes + // consumeUsing(..) + terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + assertTrue(terminated); + } + + /* + * Emulates any errors that may arise while reading the {@link InputStream} + * delivered as part of the data() call. + */ + @Test + public void validateServerCanStopWhenConsumerErrors() throws Exception { + SmtpConsumer consumer = new SmtpConsumer(); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exception = new AtomicReference<>(); + this.executor.execute(new Runnable() { + @Override + public void run() { + try { + consumer.data(mock(InputStream.class)); + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + } + }); + + this.executor.execute(new Runnable() { + @Override + public void run() { + consumer.consumeUsing((in) -> { + throw new RuntimeException("intentional"); + }); + } + }); + + // this to ensure that call to data unblocks + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(exception.get() instanceof IllegalStateException); + assertEquals("Consuming thread failed while processing 'data' SMTP event.", exception.get().getMessage()); + } + + @Test + public void validateServerCanStopWithUnconsumedMessage() throws Exception { + int port = NetworkUtils.availablePort(); + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtp = new SMTPServer(consumer); + smtp.setPort(port); + smtp.setSoftwareName("Apache NiFi"); + smtp.start(); + + BlockingQueue exceptionQueue = new ArrayBlockingQueue<>(1); + this.executor.execute(new Runnable() { + @Override + public void run() { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("Hello SMTP"); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + exceptionQueue.offer(e); + } + } + }); + assertNull(exceptionQueue.poll()); + smtp.stop(); + Exception ex = exceptionQueue.poll(100, TimeUnit.MILLISECONDS); + assertNotNull(ex); + /* + * This essentially ensures and validates that if NiFi was not able to + * successfully consume message the aftermath of the exception thrown by + * the consumer is propagated to the sender essentially ensuring no data + * loss by allowing sender to resend + */ + assertTrue(ex.getMessage().startsWith("Sending the email to the following server failed")); + } + + @Test + public void validateConsumer() throws Exception { + int port = NetworkUtils.availablePort(); + SmtpConsumer consumer = new SmtpConsumer(); + SMTPServer smtp = new SMTPServer(consumer); + smtp.setPort(port); + smtp.setSoftwareName("Apache NiFi"); + smtp.start(); + + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + this.executor.execute(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } + } + }); + + List messages = new ArrayList<>(); + for (AtomicInteger i = new AtomicInteger(); i.get() < messageCount;) { + consumer.consumeUsing((dataInputStream) -> { + i.incrementAndGet(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int size = 0; + try { + size = IOUtils.copy(dataInputStream, bos); + messages.add(new String(bos.toByteArray(), StandardCharsets.UTF_8)); + return size; + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + return size; + }); + } + + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + assertTrue(complete); + assertTrue(messages.size() == messageCount); + assertTrue(messages.get(0).contains("MSG-0")); + assertTrue(messages.get(1).contains("MSG-1")); + assertTrue(messages.get(2).contains("MSG-2")); + assertTrue(messages.get(3).contains("MSG-3")); + assertTrue(messages.get(4).contains("MSG-4")); + smtp.stop(); + } +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java index 1fd7628732..c317b0c767 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -13,42 +13,118 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.nifi.processors.email; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.commons.mail.Email; import org.apache.commons.mail.EmailException; import org.apache.commons.mail.SimpleEmail; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - -import org.apache.nifi.ssl.SSLContextService; - -import org.junit.Assert; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - public class TestListenSMTP { - @Test(timeout=15000) - public void ValidEmailTls() throws Exception { - boolean[] failed = {false}; - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + private ScheduledExecutorService executor; - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + /** + * + */ + @Before + public void before() { + this.executor = Executors.newScheduledThreadPool(2); + } + + /** + * + */ + @After + public void after() { + this.executor.shutdown(); + } + + /** + * + */ + @Test + public void validateSuccessfulInteraction() throws Exception, EmailException { + int port = NetworkUtils.availablePort(); + + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); + runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + + runner.assertValid(); + + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); + + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { + try { + Email email = new SimpleEmail(); + email.setHostName("localhost"); + email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); + email.send(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + } + } + }, 1000, TimeUnit.MILLISECONDS); + + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 5); + } + + /** + * + */ + @Test + public void validateSuccessfulInteractionWithTls() throws Exception, EmailException { + System.setProperty("mail.smtp.ssl.trust", "*"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "localtest"); + int port = NetworkUtils.availablePort(); + + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); // Setup the SSL Context - final SSLContextService sslContextService = new StandardSSLContextService(); + SSLContextService sslContextService = new StandardSSLContextService(); runner.addControllerService("ssl-context", sslContextService); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); @@ -58,262 +134,107 @@ public class TestListenSMTP { runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.enableControllerService(sslContextService); + // and add the SSL context to the runner runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); + runner.assertValid(); + int messageCount = 5; + CountDownLatch latch = new CountDownLatch(messageCount); + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { try { - - - System.setProperty("mail.smtp.ssl.trust", "*"); - System.setProperty("javax.net.ssl.keyStore", "src/test/resources/localhost-ks.jks"); - System.setProperty("javax.net.ssl.keyStorePassword", "localtest"); - Email email = new SimpleEmail(); - - email.setHostName("127.0.0.1"); + email.setHostName("localhost"); email.setSmtpPort(port); + email.setFrom("alice@nifi.apache.org"); + email.setSubject("This is a test"); + email.setMsg("MSG-" + i); + email.addTo("bob@nifi.apache.org"); // Enable STARTTLS but ignore the cert email.setStartTLSEnabled(true); email.setStartTLSRequired(true); email.setSSLCheckServerIdentity(false); - - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - } catch (final Throwable t) { - failed[0] = true; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); } } - }); - clientThread.start(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); } + }, 1500, TimeUnit.MILLISECONDS); - // Checks if client experienced Exception - Assert.assertFalse("Client experienced exception", failed[0]); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); - clientThread.stop(); - - Assert.assertFalse("Sending email failed", failed[0]); - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 5); } - @Test(timeout=15000) - public void ValidEmail() throws Exception, EmailException { - final boolean[] failed = {false}; - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); + /** + * + */ + @Test + public void validateTooLargeMessage() throws Exception, EmailException { + int port = NetworkUtils.availablePort(); - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); + TestRunner runner = TestRunners.newTestRunner(ListenSMTP.class); + runner.setProperty(ListenSMTP.SMTP_PORT, String.valueOf(port)); runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); + runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B"); - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); + runner.assertValid(); - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); + int messageCount = 1; + CountDownLatch latch = new CountDownLatch(messageCount); - final int port = listenSmtp.getPort(); + this.executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + runner.run(1, false); + } + }, 0, 500, TimeUnit.MILLISECONDS); - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { + this.executor.schedule(new Runnable() { + @Override + public void run() { + for (int i = 0; i < messageCount; i++) { try { Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); + email.setHostName("localhost"); email.setSmtpPort(port); - email.setStartTLSEnabled(false); email.setFrom("alice@nifi.apache.org"); email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); + email.setMsg("MSG-" + i); email.addTo("bob@nifi.apache.org"); email.send(); - - } catch (final EmailException t) { - failed[0] = true; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + latch.countDown(); } } - }); - clientThread.start(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); } - clientThread.stop(); + }, 1000, TimeUnit.MILLISECONDS); - Assert.assertFalse("Sending email failed", failed[0]); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 1); - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } - } - - @Test(timeout=15000, expected=EmailException.class) - public void ValidEmailTimeOut() throws Exception { - - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); - - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); - runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "50 milliseconds"); - - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - - - Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); - email.setSmtpPort(port); - email.setStartTLSEnabled(false); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - - while (runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS).isEmpty()) { - // force timeout - Thread.sleep(999L); - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); - } - - runner.assertQueueEmpty(); - final List splits = runner.getFlowFilesForRelationship(ListenSMTP.REL_SUCCESS); - splits.get(0).assertAttributeEquals("smtp.from", "alice@nifi.apache.org"); - splits.get(0).assertAttributeEquals("smtp.to", "bob@nifi.apache.org"); - - Thread.sleep(100); - - // shut down the server - listenSmtp.startShutdown(); - } - - @Test(timeout=15000) - public void emailTooLarge() throws Exception { - ListenSMTP listenSmtp = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(listenSmtp); - - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_HOSTNAME, "bermudatriangle"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "256B"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "2"); - runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); - - final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); - final ProcessContext context = runner.getProcessContext(); - - // NOTE: This test routine uses the same strategy used by TestListenAndPutSyslog - // where listenSmtp method calls are used to allow the processor to be started using - // port "0" without triggering a violation of PORT_VALIDATOR - listenSmtp.onScheduled(context); - listenSmtp.initializeSMTPServer(context); - - final int port = listenSmtp.getPort(); - AtomicBoolean finished = new AtomicBoolean(false);; - AtomicBoolean failed = new AtomicBoolean(false); - - try { - final Thread clientThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Email email = new SimpleEmail(); - email.setHostName("127.0.0.1"); - email.setSmtpPort(port); - email.setStartTLSEnabled(false); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Test test test chocolate"); - email.addTo("bob@nifi.apache.org"); - email.send(); - - } catch (final EmailException t) { - failed.set(true); - } - finished.set(true); - } - }); - clientThread.start(); - - while (!finished.get()) { - // process the request. - listenSmtp.onTrigger(context, processSessionFactory); - Thread.sleep(10); - } - clientThread.stop(); - - Assert.assertTrue("Sending email succeeded when it should have failed", failed.get()); - - runner.assertTransferCount(ListenSMTP.REL_SUCCESS, 0); - - runner.assertQueueEmpty(); - } finally { - // shut down the server - listenSmtp.startShutdown(); - } + boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); + runner.shutdown(); + assertTrue(complete); + runner.assertAllFlowFilesTransferred("success", 0); } } \ No newline at end of file