From d3b96dcac1d18c2da3d5cab2238e5de0936f63c6 Mon Sep 17 00:00:00 2001 From: joewitt Date: Sun, 14 Aug 2016 01:10:47 -0400 Subject: [PATCH] NIFI-2519 This closes #856. aligned threading model with subethastmp --- .../nifi/stream/io/LimitingInputStream.java | 46 +-- .../stream/io/LimitingInputStreamTest.java | 23 +- .../apache/nifi/util/MockProcessSession.java | 4 + .../nifi-email-processors/pom.xml | 160 +++++------ .../nifi/processors/email/ListenSMTP.java | 236 +++++----------- .../nifi/processors/email/SmtpConsumer.java | 206 -------------- .../processors/email/smtp/SmtpConsumer.java | 161 +++++++++++ .../processors/email/SmtpConsumerTest.java | 266 ------------------ .../nifi/processors/email/TestListenSMTP.java | 54 +--- 9 files changed, 342 insertions(+), 814 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java create mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java delete mode 100644 nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java index b1ebf2fbcb..70c6a321d8 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.stream.io; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -25,50 +24,35 @@ public class LimitingInputStream extends InputStream { private final InputStream in; private final long limit; private long bytesRead = 0; - private final boolean exceptionOnLimit; + private volatile boolean limitReached = false; /** * Constructs a limited input stream whereby if the limit is reached all - * subsequent calls to read will return a -1. + * subsequent calls to read will return a -1 and hasLimitReached() will + * indicate true. The limit is inclusive so if all 100 bytes of a 100 byte + * stream are read it will be true, otherwise false. * - * @param in - * the underlying input stream - * @param limit - * maximum length of bytes to read from underlying input stream + * @param in the underlying input stream + * @param limit maximum length of bytes to read from underlying input stream */ public LimitingInputStream(final InputStream in, final long limit) { - this(in, limit, false); - } - - /** - * Constructs a limited input stream whereby if the limit is reached all - * subsequent calls to read will return a -1 or EOFexception as configured - * - * @param in - * the underlying input stream - * @param limit - * maximum length of bytes to read from underlying input stream - * @param eofOnLimit - * true if EOF should occur on all read calls once limit reached; - * false if -1 should be returned instead - */ - public LimitingInputStream(final InputStream in, final long limit, final boolean eofOnLimit) { this.in = in; this.limit = limit; - exceptionOnLimit = eofOnLimit; } - private int limitReached() throws IOException { - if (exceptionOnLimit) { - throw new EOFException("Limit of allowed bytes read from input stream reached"); - } + public boolean hasReachedLimit() throws IOException { + return limitReached; + } + + private int markLimitReached() { + limitReached = true; return -1; } @Override public int read() throws IOException { if (bytesRead >= limit) { - return limitReached(); + return markLimitReached(); } final int val = in.read(); @@ -81,7 +65,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(final byte[] b) throws IOException { if (bytesRead >= limit) { - return limitReached(); + return markLimitReached(); } final int maxToRead = (int) Math.min(b.length, limit - bytesRead); @@ -96,7 +80,7 @@ public class LimitingInputStream extends InputStream { @Override public int read(byte[] b, int off, int len) throws IOException { if (bytesRead >= limit) { - return limitReached(); + return markLimitReached(); } final int maxToRead = (int) Math.min(len, limit - bytesRead); diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java index 5bb4362e1a..a884ef23a9 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/LimitingInputStreamTest.java @@ -16,36 +16,33 @@ */ package org.apache.nifi.stream.io; -import java.io.EOFException; import java.io.IOException; import junit.framework.TestCase; public class LimitingInputStreamTest extends TestCase { - private final static byte[] TEST_BUFFER = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + + private final static byte[] TEST_BUFFER = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; public void testReadLimitNotReached() throws IOException { - LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, false); + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50); long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); assertEquals(bytesRead, TEST_BUFFER.length); + assertFalse(is.hasReachedLimit()); + } - is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, true); - bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); + public void testReadLimitMatched() throws IOException { + final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 10); + long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); assertEquals(bytesRead, TEST_BUFFER.length); + assertTrue(is.hasReachedLimit()); } public void testReadLimitExceeded() throws IOException { final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9); final long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream()); assertEquals(bytesRead, 9); + assertTrue(is.hasReachedLimit()); } - public void testReadLimitExceededEof() throws IOException { - final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9, true); - try { - StreamUtils.copy(is, new ByteArrayOutputStream()); - fail("Should not get here"); - } catch (final EOFException eof) { - } - } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 5bc23f9570..87ef4acc54 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -590,6 +590,10 @@ public class MockProcessSession implements ProcessSession { @Override public void rollback(final boolean penalize) { + //if we've already committed then rollback is basically a no-op + if(committed){ + return; + } final List openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List for (final InputStream openInputStream : openStreamCopy) { try { diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index 34125f4263..01d85b859b 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -1,90 +1,84 @@ - + - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - org.apache.nifi - nifi-email-bundle - 1.0.0-SNAPSHOT - + + org.apache.nifi + nifi-email-bundle + 1.0.0-SNAPSHOT + - nifi-email-processors - jar + nifi-email-processors + jar - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-processor-utils - - - javax.mail - mail - - - org.apache.commons - commons-email - 1.4 - - - org.subethamail - subethasmtp - 3.1.7 - - - org.apache.nifi - nifi-ssl-context-service-api - - - org.springframework.integration - spring-integration-mail - 4.3.0.RELEASE - - - org.springframework.retry - spring-retry - - - - - commons-logging - commons-logging - 1.2 - - - org.apache.nifi - nifi-ssl-context-service - test - - - org.apache.nifi - nifi-mock - test - - - - - - org.apache.rat - apache-rat-plugin - - - - - + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + javax.mail + mail + + + org.apache.commons + commons-email + 1.4 + + + org.subethamail + subethasmtp + 3.1.7 + + + org.apache.nifi + nifi-ssl-context-service-api + + + org.springframework.integration + spring-integration-mail + 4.3.0.RELEASE + + + org.springframework.retry + spring-retry + + + + + commons-logging + commons-logging + 1.2 + + + org.apache.nifi + nifi-ssl-context-service + test + + + org.apache.nifi + nifi-mock + test + + diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java index 52d448168b..760db134d9 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java @@ -17,28 +17,22 @@ package org.apache.nifi.processors.email; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; -import java.security.cert.Certificate; -import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -47,45 +41,48 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.email.smtp.SmtpConsumer; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.stream.io.LimitingInputStream; import org.springframework.util.StringUtils; +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandlerFactory; import org.subethamail.smtp.server.SMTPServer; @Tags({"listen", "email", "smtp"}) +@TriggerSerially @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " + "allowing nifi to listen for incoming email. Note this server does not perform any email " + "validation. If direct exposure to the internet is sought, it may be a better idea to use " - + "the combination of NiFi and an industrial scale MTA (e.g. Postfix)") + + "the combination of NiFi and an industrial scale MTA (e.g. Postfix). Threading for this " + + "processor is managed by the underlying smtp server used so the processor need not support " + + "more than one thread.") @WritesAttributes({ @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + - "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + - "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " + + "certificates used by an TLS peer"), + @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " + + "certificates used by an TLS peer"), @WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection"), @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.recipient", description = "The value used during RCPT TO (i.e. envelope)"), + @WritesAttribute(attribute = "smtp.recipient.*", description = "The values used during RCPT TO (i.e. envelope)"), @WritesAttribute(attribute = "mime.type", description = "Mime type of the message")}) public class ListenSMTP extends AbstractSessionFactoryProcessor { + static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() .name("SMTP_PORT") .displayName("Listening Port") - .description("The TCP port the ListenSMTP processor will bind to." + - "NOTE that on Unix derivative operating systems this port must " + - "be higher than 1024 unless NiFi is running as with root user permissions.") + .description("The TCP port the ListenSMTP processor will bind to." + + "NOTE that on Unix derivative operating systems this port must " + + "be higher than 1024 unless NiFi is running as with root user permissions.") .required(true) .expressionLanguageSupported(false) .addValidator(StandardValidators.PORT_VALIDATOR) @@ -124,8 +121,8 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL_CONTEXT_SERVICE") .displayName("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + - "messages will be received over a secure connection.") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be received over a secure connection.") .required(false) .identifiesControllerService(SSLContextService.class) .build(); @@ -141,20 +138,20 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() .name("SMTP_HOSTNAME") .displayName("SMTP hostname") - .description("The hostname to be embedded into the banner displayed when an " + - "SMTP client connects to the processor TCP port .") + .description("The hostname to be embedded into the banner displayed when an " + + "SMTP client connects to the processor TCP port .") .expressionLanguageSupported(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - static final Relationship REL_SUCCESS = new Relationship.Builder() + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All new messages will be routed as FlowFiles to this relationship") .build(); - private final static List propertyDescriptors; + private final static List PROPERTY_DESCRIPTORS; - private final static Set relationships; + private final static Set RELATIONSHIPS; static { List _propertyDescriptors = new ArrayList<>(); @@ -165,84 +162,44 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { _propertyDescriptors.add(SSL_CONTEXT_SERVICE); _propertyDescriptors.add(CLIENT_AUTH); _propertyDescriptors.add(SMTP_HOSTNAME); - propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); _relationships.add(REL_SUCCESS); - relationships = Collections.unmodifiableSet(_relationships); + RELATIONSHIPS = Collections.unmodifiableSet(_relationships); } private volatile SMTPServer smtp; - private volatile SmtpConsumer smtpConsumer; - - private volatile int maxMessageSize; - - /** - * - */ @Override - public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { - ProcessSession processSession = sessionFactory.createSession(); - if (this.smtp == null) { - this.setupSmtpIfNecessary(context, processSession); - } - - /* - * Will consume incoming message directly from the wire and into - * FlowFile/Content repository before exiting. This essentially limits - * any potential data loss by allowing SMTPServer thread to actually - * commit NiFi session if all good. However in the event of exception, - * such exception will be propagated back to the email sender via - * "undeliverable message" allowing such user to re-send the message - */ - this.smtpConsumer.consumeUsing((inputDataStream) -> { - FlowFile flowFile = processSession.create(); - AtomicInteger size = new AtomicInteger(); + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + if (smtp == null) { try { - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - size.set(IOUtils.copy(new LimitingInputStream(inputDataStream, ListenSMTP.this.maxMessageSize, true), out)); - } - }); - flowFile = updateFlowFileWithAttributes(flowFile, processSession); - - processSession.getProvenanceReporter().receive(flowFile, - "smtp://" + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/"); - processSession.transfer(flowFile, REL_SUCCESS); - processSession.commit(); - return size.get(); - } catch (Exception e) { - context.yield(); - this.getLogger().error("Failed while processing incoming mail. " + e.getMessage(), e); - throw new IllegalStateException("Failed while processing incoming mail. " + e.getMessage(), e); + final SMTPServer server = prepareServer(context, sessionFactory); + server.start(); + smtp = server; + } catch (final Exception ex) {//have to catch exception due to awkward exception handling in subethasmtp + smtp = null; + getLogger().error("Unable to start SMTP server due to " + ex.getMessage(), ex); } - }); + } + context.yield();//nothing really to do here since threading managed by smtp server sessions } - /** - * - */ @OnStopped public void stop() { - this.getLogger().info("Stopping SMTPServer"); - this.smtp.stop(); - this.smtp = null; - this.getLogger().info("SMTPServer stopped"); + try { + smtp.stop(); + } finally { + smtp = null; + } } - /** - * - */ @Override public Set getRelationships() { - return relationships; + return RELATIONSHIPS; } - /** - * - */ @Override protected Collection customValidate(ValidationContext validationContext) { List results = new ArrayList<>(); @@ -266,74 +223,30 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { return results; } - /** - * - */ @Override protected List getSupportedPropertyDescriptors() { - return propertyDescriptors; + return PROPERTY_DESCRIPTORS; } - /** - * - */ - private FlowFile updateFlowFileWithAttributes(FlowFile flowFile, ProcessSession processSession) { - Map attributes = new HashMap<>(); - Certificate[] tlsPeerCertificates = this.smtpConsumer.getMessageContext().getTlsPeerCertificates(); - if (tlsPeerCertificates != null) { - for (int i = 0; i < tlsPeerCertificates.length; i++) { - if (tlsPeerCertificates[i] instanceof X509Certificate) { - X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i]; - attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); - attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName()); - } - } - } - - attributes.put("smtp.helo", this.smtpConsumer.getMessageContext().getHelo()); - attributes.put("smtp.remote.addr", this.smtpConsumer.getMessageContext().getRemoteAddress().toString()); - attributes.put("smtp.from", this.smtpConsumer.getFrom()); - attributes.put("smtp.recepient", this.smtpConsumer.getRecipient()); - attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); - return processSession.putAllAttributes(flowFile, attributes); - } - - /** - * - */ - private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) { - if (this.smtp == null) { - SmtpConsumer consumer = new SmtpConsumer(); - SMTPServer smtpServer = this.createServerInstance(context, consumer); - smtpServer.setSoftwareName("Apache NiFi"); - smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger()); - smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - this.maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); - smtpServer.setMaxMessageSize(this.maxMessageSize); - smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - if (context.getProperty(SMTP_HOSTNAME).isSet()) { - smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); - } - - this.smtpConsumer = consumer; - this.smtp = smtpServer; - this.smtp.start(); - } - } - - /** - * - */ - private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) { - SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - SMTPServer smtpServer = sslContextService == null ? new ConsumerAwareSmtpServer(consumer) : new ConsumerAwareSmtpServer(consumer) { + private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + final int port = context.getProperty(SMTP_PORT).asInteger(); + final String host = context.getProperty(SMTP_HOSTNAME).getValue(); + final ComponentLog log = getLogger(); + final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); + //create message handler factory + final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> { + return new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize); + }; + //create smtp server + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SMTPServer smtpServer = sslContextService == null ? new SMTPServer(messageHandlerFactory) : new SMTPServer(messageHandlerFactory) { @Override public SSLSocket createSSLSocket(Socket socket) throws IOException { InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth)); SSLSocketFactory socketFactory = sslContext.getSocketFactory(); - SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true)); + SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true)); sslSocket.setUseClientMode(false); if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) { @@ -348,33 +261,14 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor { } else { smtpServer.setHideTLS(true); } + smtpServer.setSoftwareName("Apache NiFi SMTP"); + smtpServer.setPort(port); + smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); + smtpServer.setMaxMessageSize(maxMessageSize); + smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + if (context.getProperty(SMTP_HOSTNAME).isSet()) { + smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue()); + } return smtpServer; } - - /** - * Wrapper over {@link SMTPServer} that is aware of the {@link SmtpConsumer} - * to ensure that its stop() operation is called during server stoppage. - */ - private static class ConsumerAwareSmtpServer extends SMTPServer { - - /** - * - */ - public ConsumerAwareSmtpServer(SmtpConsumer consumer) { - super(consumer); - } - - /** - * - */ - @Override - public synchronized void stop() { - try { - SmtpConsumer consumer = (SmtpConsumer) this.getMessageHandlerFactory(); - consumer.stop(); - } finally { - super.stop(); - } - } - } } diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java deleted file mode 100644 index 2063c0da45..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/SmtpConsumer.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email; - -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.subethamail.smtp.MessageContext; -import org.subethamail.smtp.MessageHandler; -import org.subethamail.smtp.MessageHandlerFactory; -import org.subethamail.smtp.RejectException; -import org.subethamail.smtp.TooMuchDataException; -import org.subethamail.smtp.server.SMTPServer; - -/** - * A simple consumer that provides a bridge between 'push' message distribution - * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. It - * implements both {@link MessageHandler} and {@link MessageHandlerFactory} - * allowing it to interact directly with {@link SMTPServer}. - */ -class SmtpConsumer implements MessageHandler, MessageHandlerFactory { - - private final static int CONSUMER_STOPPED = -1; - - private final static int INTERRUPTED = -2; - - private final static int ERROR = -9; - - private final static int NO_MESSAGE = -8; - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final BlockingQueue messageDataQueue = new ArrayBlockingQueue<>(1); - - private final BlockingQueue consumedMessageSizeQueue = new ArrayBlockingQueue<>(1); - - private final AtomicBoolean running = new AtomicBoolean(true); - - private volatile MessageContext messageContext; - - private volatile String from; - - private volatile String recipient; - - - /** - * - */ - String getFrom() { - return this.from; - } - - /** - * - */ - String getRecipient() { - return this.recipient; - } - - /** - * - */ - MessageContext getMessageContext() { - return this.messageContext; - } - - /** - * This operation will simply attempt to put a poison message to the - * 'consumedMessageSizeQueue' to ensure that in the event this consumer is - * stopped before the message is consumed (see - * {@link #consumeUsing(Function)}), the server thread that is blocking in - * {@link #data(InputStream)} operation can unblock. - */ - // NOTE: the 'synchronize' is only here for API correctness, to ensure that - // stop() and consumeUsing(..) can never be invoked at the same time. - // However within NiFi it can never happen. - synchronized void stop() { - this.running.compareAndSet(true, false); - this.consumedMessageSizeQueue.offer(CONSUMER_STOPPED); - } - - /** - * This operation is invoked by the consumer. Implementation of this - * operation creates a synchronous connection with the message producer (see - * {@link #data(InputStream)}) via a pair of queues which guarantees that - * message is fully consumed and disposed by the consumer (provided as - * {@link Function}) before the server closes the data stream. - */ - // NOTE: the 'synchronize' is only here for API correctness, to ensure that - // stop() and consumeUsing(..) can never be invoked at the same time. - // However within NiFi it can never happen. - synchronized void consumeUsing(Function resultConsumer) { - int messageSize = 0; - try { - InputStream message = this.messageDataQueue.poll(1000, TimeUnit.MILLISECONDS); - if (message != null) { - messageSize = resultConsumer.apply(message); - } else { - messageSize = NO_MESSAGE; - } - } catch (InterruptedException e) { - this.logger.warn("Current thread is interrupted", e); - messageSize = INTERRUPTED; - Thread.currentThread().interrupt(); - } finally { - if (messageSize == 0) { - messageSize = ERROR; - } - if (messageSize != NO_MESSAGE) { - this.consumedMessageSizeQueue.offer(messageSize); - } - } - } - - /** - * This operation is invoked by the server thread and contains message data - * as {@link InputStream}. Implementation of this operation creates a - * synchronous connection with consumer (see {@link #onMessage(Function)}) - * via a pair of queues which guarantees that message is fully consumed and - * disposed by the consumer by the time this operation exits. - */ - @Override - public void data(InputStream data) throws RejectException, TooMuchDataException, IOException { - if (this.running.get()) { - try { - this.messageDataQueue.offer(data, Integer.MAX_VALUE, TimeUnit.SECONDS); - long messageSize = this.consumedMessageSizeQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); - String exceptionMessage = null; - if (messageSize == CONSUMER_STOPPED) { - exceptionMessage = "NIFI Consumer was stopped before message was successfully consumed"; - } else if (messageSize == INTERRUPTED) { - exceptionMessage = "Consuming thread was interrupted"; - } else if (messageSize == ERROR) { - exceptionMessage = "Consuming thread failed while processing 'data' SMTP event."; - } - if (exceptionMessage != null) { - throw new IllegalStateException(exceptionMessage); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Received message of size: " + messageSize); - } - } - } catch (InterruptedException e) { - this.logger.warn("Current thread is interrupted", e); - Thread.currentThread().interrupt(); - throw new IllegalStateException("Current thread is interrupted", e); - } - } else { - throw new IllegalStateException("NIFI Consumer was stopped before message was successfully consumed"); - } - } - - /** - * - */ - @Override - public void from(String from) throws RejectException { - this.from = from; - } - - /** - * - */ - @Override - public void recipient(String recipient) throws RejectException { - this.recipient = recipient; - } - - /** - * - */ - @Override - public void done() { - // noop - } - - /** - * - */ - @Override - public MessageHandler create(MessageContext ctx) { - this.messageContext = ctx; - return this; - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java new file mode 100644 index 0000000000..4dad3bc004 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email.smtp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processors.email.ListenSMTP; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.util.StopWatch; + +import org.subethamail.smtp.MessageContext; +import org.subethamail.smtp.MessageHandler; +import org.subethamail.smtp.RejectException; +import org.subethamail.smtp.TooMuchDataException; +import org.subethamail.smtp.server.SMTPServer; + +/** + * A simple consumer that provides a bridge between 'push' message distribution + * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. + */ +public class SmtpConsumer implements MessageHandler { + + private String from = null; + private final List recipientList = new ArrayList<>(); + private final MessageContext context; + private final ProcessSessionFactory sessionFactory; + private final int port; + private final int maxMessageSize; + private final ComponentLog log; + private final String host; + + public SmtpConsumer( + final MessageContext context, + final ProcessSessionFactory sessionFactory, + final int port, + final String host, + final ComponentLog log, + final int maxMessageSize + ) { + this.context = context; + this.sessionFactory = sessionFactory; + this.port = port; + if (host == null || host.trim().isEmpty()) { + this.host = context.getSMTPServer().getHostName(); + } else { + this.host = host; + } + this.log = log; + this.maxMessageSize = maxMessageSize; + } + + String getFrom() { + return from; + } + + List getRecipients() { + return Collections.unmodifiableList(recipientList); + } + + @Override + public void data(final InputStream data) throws RejectException, TooMuchDataException, IOException { + final ProcessSession processSession = sessionFactory.createSession(); + final StopWatch watch = new StopWatch(); + watch.start(); + try { + FlowFile flowFile = processSession.create(); + final AtomicBoolean limitExceeded = new AtomicBoolean(false); + flowFile = processSession.write(flowFile, (OutputStream out) -> { + final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize); + IOUtils.copy(lis, out); + if (lis.hasReachedLimit()) { + limitExceeded.set(true); + } + }); + if (limitExceeded.get()) { + throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages"); + } + flowFile = processSession.putAllAttributes(flowFile, extractMessageAttributes()); + watch.stop(); + processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS)); + processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS); + processSession.commit(); + } catch (FlowFileAccessException | IllegalStateException | RejectException | IOException ex) { + log.error("Unable to fully process input due to " + ex.getMessage(), ex); + throw ex; + } finally { + processSession.rollback(); //make sure this happens no matter what - is safe + } + } + + @Override + public void from(final String from) throws RejectException { + this.from = from; + } + + @Override + public void recipient(final String recipient) throws RejectException { + if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) { + recipientList.add(recipient); + } + } + + @Override + public void done() { + } + + private Map extractMessageAttributes() { + final Map attributes = new HashMap<>(); + final Certificate[] tlsPeerCertificates = context.getTlsPeerCertificates(); + if (tlsPeerCertificates != null) { + for (int i = 0; i < tlsPeerCertificates.length; i++) { + if (tlsPeerCertificates[i] instanceof X509Certificate) { + X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i]; + attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); + attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName()); + } + } + } + + attributes.put("smtp.helo", context.getHelo()); + attributes.put("smtp.remote.addr", context.getRemoteAddress().toString()); + attributes.put("smtp.from", from); + for (int i = 0; i < recipientList.size(); i++) { + attributes.put("smtp.recipient." + i, recipientList.get(i)); + } + attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); + return attributes; + } + +} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java deleted file mode 100644 index 4a2ad80fe6..0000000000 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/SmtpConsumerTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.mail.Email; -import org.apache.commons.mail.SimpleEmail; -import org.apache.nifi.remote.io.socket.NetworkUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.subethamail.smtp.server.SMTPServer; - -public class SmtpConsumerTest { - - private volatile ExecutorService executor; - - @Before - public void before() { - this.executor = Executors.newCachedThreadPool(); - } - - @After - public void after() { - this.executor.shutdown(); - } - - @Test - public void validateServerCanStopWhenConsumerStoppedBeforeConsumingMessage() throws Exception { - SmtpConsumer consumer = new SmtpConsumer(); - CountDownLatch latch = new CountDownLatch(10); - AtomicReference exception = new AtomicReference<>(); - this.executor.execute(new Runnable() { - @Override - public void run() { - for (int i = 0; i < 10; i++) { - try { - consumer.data(mock(InputStream.class)); - } catch (Exception e) { - e.printStackTrace(); - exception.set(e); - } finally { - latch.countDown(); - } - } - } - }); - boolean finished = latch.await(2000, TimeUnit.MILLISECONDS); - assertFalse(finished); - this.executor.shutdown(); - boolean terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); - assertFalse(terminated); - - consumer.stop(); - finished = latch.await(1000, TimeUnit.MILLISECONDS); - assertTrue(finished); - terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); - assertTrue(terminated); - } - - /* - * This test simply validates that consumeUsing(..) can react properly to - * thread interrupts. That said the condition is impossible in the current - * usage of SmtpConsumer - */ - @Test - public void validateServerCanStopWhenConsumerInterrupted() throws Exception { - SmtpConsumer consumer = new SmtpConsumer(); - AtomicReference thread = new AtomicReference<>(); - - this.executor.execute(new Runnable() { - @Override - public void run() { - thread.set(Thread.currentThread()); - consumer.consumeUsing((in) -> { - return 0; - }); - } - }); - - this.executor.shutdown(); - boolean terminated = this.executor.awaitTermination(200, TimeUnit.MILLISECONDS); - assertFalse(terminated); // the call to consumeUsing(..) is blocking on - // the queue.poll since nothing is there - - thread.get().interrupt(); // interrupt thread that executes - // consumeUsing(..) - terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS); - assertTrue(terminated); - } - - /* - * Emulates any errors that may arise while reading the {@link InputStream} - * delivered as part of the data() call. - */ - @Test - public void validateServerCanStopWhenConsumerErrors() throws Exception { - SmtpConsumer consumer = new SmtpConsumer(); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exception = new AtomicReference<>(); - this.executor.execute(new Runnable() { - @Override - public void run() { - try { - consumer.data(mock(InputStream.class)); - } catch (Exception e) { - exception.set(e); - } finally { - latch.countDown(); - } - } - }); - - this.executor.execute(new Runnable() { - @Override - public void run() { - consumer.consumeUsing((in) -> { - throw new RuntimeException("intentional"); - }); - } - }); - - // this to ensure that call to data unblocks - assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); - assertTrue(exception.get() instanceof IllegalStateException); - assertEquals("Consuming thread failed while processing 'data' SMTP event.", exception.get().getMessage()); - } - - @Test - public void validateServerCanStopWithUnconsumedMessage() throws Exception { - int port = NetworkUtils.availablePort(); - SmtpConsumer consumer = new SmtpConsumer(); - SMTPServer smtp = new SMTPServer(consumer); - smtp.setPort(port); - smtp.setSoftwareName("Apache NiFi"); - smtp.start(); - - BlockingQueue exceptionQueue = new ArrayBlockingQueue<>(1); - this.executor.execute(new Runnable() { - @Override - public void run() { - try { - Email email = new SimpleEmail(); - email.setHostName("localhost"); - email.setSmtpPort(port); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("Hello SMTP"); - email.addTo("bob@nifi.apache.org"); - email.send(); - } catch (Exception e) { - exceptionQueue.offer(e); - } - } - }); - assertNull(exceptionQueue.poll()); - smtp.stop(); - Exception ex = exceptionQueue.poll(100, TimeUnit.MILLISECONDS); - assertNotNull(ex); - /* - * This essentially ensures and validates that if NiFi was not able to - * successfully consume message the aftermath of the exception thrown by - * the consumer is propagated to the sender essentially ensuring no data - * loss by allowing sender to resend - */ - assertTrue(ex.getMessage().startsWith("Sending the email to the following server failed")); - } - - @Test - public void validateConsumer() throws Exception { - int port = NetworkUtils.availablePort(); - SmtpConsumer consumer = new SmtpConsumer(); - SMTPServer smtp = new SMTPServer(consumer); - smtp.setPort(port); - smtp.setSoftwareName("Apache NiFi"); - smtp.start(); - - int messageCount = 5; - CountDownLatch latch = new CountDownLatch(messageCount); - this.executor.execute(new Runnable() { - @Override - public void run() { - for (int i = 0; i < messageCount; i++) { - try { - Email email = new SimpleEmail(); - email.setHostName("localhost"); - email.setSmtpPort(port); - email.setFrom("alice@nifi.apache.org"); - email.setSubject("This is a test"); - email.setMsg("MSG-" + i); - email.addTo("bob@nifi.apache.org"); - email.send(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - latch.countDown(); - } - } - } - }); - - List messages = new ArrayList<>(); - for (AtomicInteger i = new AtomicInteger(); i.get() < messageCount;) { - consumer.consumeUsing((dataInputStream) -> { - i.incrementAndGet(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int size = 0; - try { - size = IOUtils.copy(dataInputStream, bos); - messages.add(new String(bos.toByteArray(), StandardCharsets.UTF_8)); - return size; - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - return size; - }); - } - - boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); - assertTrue(complete); - assertTrue(messages.size() == messageCount); - assertTrue(messages.get(0).contains("MSG-0")); - assertTrue(messages.get(1).contains("MSG-1")); - assertTrue(messages.get(2).contains("MSG-2")); - assertTrue(messages.get(3).contains("MSG-3")); - assertTrue(messages.get(4).contains("MSG-4")); - smtp.stop(); - } -} diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java index c317b0c767..f17978fc08 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java @@ -39,25 +39,16 @@ public class TestListenSMTP { private ScheduledExecutorService executor; - /** - * - */ @Before public void before() { this.executor = Executors.newScheduledThreadPool(2); } - /** - * - */ @After public void after() { this.executor.shutdown(); } - /** - * - */ @Test public void validateSuccessfulInteraction() throws Exception, EmailException { int port = NetworkUtils.availablePort(); @@ -68,21 +59,14 @@ public class TestListenSMTP { runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds"); runner.assertValid(); - - int messageCount = 5; - CountDownLatch latch = new CountDownLatch(messageCount); - - this.executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - runner.run(1, false); - } - }, 0, 500, TimeUnit.MILLISECONDS); + runner.run(5, false); + final int numMessages = 5; + CountDownLatch latch = new CountDownLatch(numMessages); this.executor.schedule(new Runnable() { @Override public void run() { - for (int i = 0; i < messageCount; i++) { + for (int i = 0; i < numMessages; i++) { try { Email email = new SimpleEmail(); email.setHostName("localhost"); @@ -100,17 +84,14 @@ public class TestListenSMTP { } } } - }, 1000, TimeUnit.MILLISECONDS); + }, 1500, TimeUnit.MILLISECONDS); boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); runner.shutdown(); assertTrue(complete); - runner.assertAllFlowFilesTransferred("success", 5); + runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages); } - /** - * - */ @Test public void validateSuccessfulInteractionWithTls() throws Exception, EmailException { System.setProperty("mail.smtp.ssl.trust", "*"); @@ -134,7 +115,6 @@ public class TestListenSMTP { runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); runner.enableControllerService(sslContextService); - // and add the SSL context to the runner runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name()); @@ -142,13 +122,7 @@ public class TestListenSMTP { int messageCount = 5; CountDownLatch latch = new CountDownLatch(messageCount); - - this.executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - runner.run(1, false); - } - }, 0, 500, TimeUnit.MILLISECONDS); + runner.run(messageCount, false); this.executor.schedule(new Runnable() { @Override @@ -181,12 +155,9 @@ public class TestListenSMTP { boolean complete = latch.await(5000, TimeUnit.MILLISECONDS); runner.shutdown(); assertTrue(complete); - runner.assertAllFlowFilesTransferred("success", 5); + runner.assertAllFlowFilesTransferred("success", messageCount); } - /** - * - */ @Test public void validateTooLargeMessage() throws Exception, EmailException { int port = NetworkUtils.availablePort(); @@ -202,12 +173,7 @@ public class TestListenSMTP { int messageCount = 1; CountDownLatch latch = new CountDownLatch(messageCount); - this.executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - runner.run(1, false); - } - }, 0, 500, TimeUnit.MILLISECONDS); + runner.run(messageCount, false); this.executor.schedule(new Runnable() { @Override @@ -237,4 +203,4 @@ public class TestListenSMTP { assertTrue(complete); runner.assertAllFlowFilesTransferred("success", 0); } -} \ No newline at end of file +}