NIFI-2519 This closes #856. aligned threading model with subethastmp

This commit is contained in:
joewitt 2016-08-14 01:10:47 -04:00
parent 48fa76ecff
commit d3b96dcac1
9 changed files with 342 additions and 814 deletions

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.stream.io;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@ -25,50 +24,35 @@ public class LimitingInputStream extends InputStream {
private final InputStream in;
private final long limit;
private long bytesRead = 0;
private final boolean exceptionOnLimit;
private volatile boolean limitReached = false;
/**
* Constructs a limited input stream whereby if the limit is reached all
* subsequent calls to read will return a -1.
* subsequent calls to read will return a -1 and hasLimitReached() will
* indicate true. The limit is inclusive so if all 100 bytes of a 100 byte
* stream are read it will be true, otherwise false.
*
* @param in
* the underlying input stream
* @param limit
* maximum length of bytes to read from underlying input stream
* @param in the underlying input stream
* @param limit maximum length of bytes to read from underlying input stream
*/
public LimitingInputStream(final InputStream in, final long limit) {
this(in, limit, false);
}
/**
* Constructs a limited input stream whereby if the limit is reached all
* subsequent calls to read will return a -1 or EOFexception as configured
*
* @param in
* the underlying input stream
* @param limit
* maximum length of bytes to read from underlying input stream
* @param eofOnLimit
* true if EOF should occur on all read calls once limit reached;
* false if -1 should be returned instead
*/
public LimitingInputStream(final InputStream in, final long limit, final boolean eofOnLimit) {
this.in = in;
this.limit = limit;
exceptionOnLimit = eofOnLimit;
}
private int limitReached() throws IOException {
if (exceptionOnLimit) {
throw new EOFException("Limit of allowed bytes read from input stream reached");
}
public boolean hasReachedLimit() throws IOException {
return limitReached;
}
private int markLimitReached() {
limitReached = true;
return -1;
}
@Override
public int read() throws IOException {
if (bytesRead >= limit) {
return limitReached();
return markLimitReached();
}
final int val = in.read();
@ -81,7 +65,7 @@ public class LimitingInputStream extends InputStream {
@Override
public int read(final byte[] b) throws IOException {
if (bytesRead >= limit) {
return limitReached();
return markLimitReached();
}
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
@ -96,7 +80,7 @@ public class LimitingInputStream extends InputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (bytesRead >= limit) {
return limitReached();
return markLimitReached();
}
final int maxToRead = (int) Math.min(len, limit - bytesRead);

View File

@ -16,36 +16,33 @@
*/
package org.apache.nifi.stream.io;
import java.io.EOFException;
import java.io.IOException;
import junit.framework.TestCase;
public class LimitingInputStreamTest extends TestCase {
private final static byte[] TEST_BUFFER = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
private final static byte[] TEST_BUFFER = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
public void testReadLimitNotReached() throws IOException {
LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, false);
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50);
long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, TEST_BUFFER.length);
assertFalse(is.hasReachedLimit());
}
is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 50, true);
bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
public void testReadLimitMatched() throws IOException {
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 10);
long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, TEST_BUFFER.length);
assertTrue(is.hasReachedLimit());
}
public void testReadLimitExceeded() throws IOException {
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9);
final long bytesRead = StreamUtils.copy(is, new ByteArrayOutputStream());
assertEquals(bytesRead, 9);
assertTrue(is.hasReachedLimit());
}
public void testReadLimitExceededEof() throws IOException {
final LimitingInputStream is = new LimitingInputStream(new ByteArrayInputStream(TEST_BUFFER), 9, true);
try {
StreamUtils.copy(is, new ByteArrayOutputStream());
fail("Should not get here");
} catch (final EOFException eof) {
}
}
}

View File

@ -590,6 +590,10 @@ public class MockProcessSession implements ProcessSession {
@Override
public void rollback(final boolean penalize) {
//if we've already committed then rollback is basically a no-op
if(committed){
return;
}
final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
for (final InputStream openInputStream : openStreamCopy) {
try {

View File

@ -1,90 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>4.3.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>4.3.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,28 +17,22 @@
package org.apache.nifi.processors.email;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -47,45 +41,48 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.email.smtp.SmtpConsumer;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.springframework.util.StringUtils;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.server.SMTPServer;
@Tags({"listen", "email", "smtp"})
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, "
+ "allowing nifi to listen for incoming email. Note this server does not perform any email "
+ "validation. If direct exposure to the internet is sought, it may be a better idea to use "
+ "the combination of NiFi and an industrial scale MTA (e.g. Postfix)")
+ "the combination of NiFi and an industrial scale MTA (e.g. Postfix). Threading for this "
+ "processor is managed by the underlying smtp server used so the processor need not support "
+ "more than one thread.")
@WritesAttributes({
@WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"),
@WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " +
"certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " +
"certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the "
+ "certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the "
+ "certificates used by an TLS peer"),
@WritesAttribute(attribute = "smtp.src", description = "The source IP of the SMTP connection"),
@WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.recipient", description = "The value used during RCPT TO (i.e. envelope)"),
@WritesAttribute(attribute = "smtp.recipient.*", description = "The values used during RCPT TO (i.e. envelope)"),
@WritesAttribute(attribute = "mime.type", description = "Mime type of the message")})
public class ListenSMTP extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.name("SMTP_PORT")
.displayName("Listening Port")
.description("The TCP port the ListenSMTP processor will bind to." +
"NOTE that on Unix derivative operating systems this port must " +
"be higher than 1024 unless NiFi is running as with root user permissions.")
.description("The TCP port the ListenSMTP processor will bind to."
+ "NOTE that on Unix derivative operating systems this port must "
+ "be higher than 1024 unless NiFi is running as with root user permissions.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
@ -124,8 +121,8 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be received over a secure connection.")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, "
+ "messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@ -141,20 +138,20 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.name("SMTP_HOSTNAME")
.displayName("SMTP hostname")
.description("The hostname to be embedded into the banner displayed when an " +
"SMTP client connects to the processor TCP port .")
.description("The hostname to be embedded into the banner displayed when an "
+ "SMTP client connects to the processor TCP port .")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All new messages will be routed as FlowFiles to this relationship")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
private final static Set<Relationship> relationships;
private final static Set<Relationship> RELATIONSHIPS;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
@ -165,84 +162,44 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
_propertyDescriptors.add(SMTP_HOSTNAME);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
}
private volatile SMTPServer smtp;
private volatile SmtpConsumer smtpConsumer;
private volatile int maxMessageSize;
/**
*
*/
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession processSession = sessionFactory.createSession();
if (this.smtp == null) {
this.setupSmtpIfNecessary(context, processSession);
}
/*
* Will consume incoming message directly from the wire and into
* FlowFile/Content repository before exiting. This essentially limits
* any potential data loss by allowing SMTPServer thread to actually
* commit NiFi session if all good. However in the event of exception,
* such exception will be propagated back to the email sender via
* "undeliverable message" allowing such user to re-send the message
*/
this.smtpConsumer.consumeUsing((inputDataStream) -> {
FlowFile flowFile = processSession.create();
AtomicInteger size = new AtomicInteger();
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
if (smtp == null) {
try {
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
size.set(IOUtils.copy(new LimitingInputStream(inputDataStream, ListenSMTP.this.maxMessageSize, true), out));
}
});
flowFile = updateFlowFileWithAttributes(flowFile, processSession);
processSession.getProvenanceReporter().receive(flowFile,
"smtp://" + ListenSMTP.this.smtp.getHostName() + ":" + ListenSMTP.this.smtp.getPort() + "/");
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
return size.get();
} catch (Exception e) {
context.yield();
this.getLogger().error("Failed while processing incoming mail. " + e.getMessage(), e);
throw new IllegalStateException("Failed while processing incoming mail. " + e.getMessage(), e);
final SMTPServer server = prepareServer(context, sessionFactory);
server.start();
smtp = server;
} catch (final Exception ex) {//have to catch exception due to awkward exception handling in subethasmtp
smtp = null;
getLogger().error("Unable to start SMTP server due to " + ex.getMessage(), ex);
}
});
}
context.yield();//nothing really to do here since threading managed by smtp server sessions
}
/**
*
*/
@OnStopped
public void stop() {
this.getLogger().info("Stopping SMTPServer");
this.smtp.stop();
this.smtp = null;
this.getLogger().info("SMTPServer stopped");
try {
smtp.stop();
} finally {
smtp = null;
}
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
return RELATIONSHIPS;
}
/**
*
*/
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>();
@ -266,74 +223,30 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
return results;
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
return PROPERTY_DESCRIPTORS;
}
/**
*
*/
private FlowFile updateFlowFileWithAttributes(FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<>();
Certificate[] tlsPeerCertificates = this.smtpConsumer.getMessageContext().getTlsPeerCertificates();
if (tlsPeerCertificates != null) {
for (int i = 0; i < tlsPeerCertificates.length; i++) {
if (tlsPeerCertificates[i] instanceof X509Certificate) {
X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i];
attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName());
}
}
}
attributes.put("smtp.helo", this.smtpConsumer.getMessageContext().getHelo());
attributes.put("smtp.remote.addr", this.smtpConsumer.getMessageContext().getRemoteAddress().toString());
attributes.put("smtp.from", this.smtpConsumer.getFrom());
attributes.put("smtp.recepient", this.smtpConsumer.getRecipient());
attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
return processSession.putAllAttributes(flowFile, attributes);
}
/**
*
*/
private synchronized void setupSmtpIfNecessary(ProcessContext context, ProcessSession processSession) {
if (this.smtp == null) {
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtpServer = this.createServerInstance(context, consumer);
smtpServer.setSoftwareName("Apache NiFi");
smtpServer.setPort(context.getProperty(SMTP_PORT).asInteger());
smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
this.maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue();
smtpServer.setMaxMessageSize(this.maxMessageSize);
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
if (context.getProperty(SMTP_HOSTNAME).isSet()) {
smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
}
this.smtpConsumer = consumer;
this.smtp = smtpServer;
this.smtp.start();
}
}
/**
*
*/
private SMTPServer createServerInstance(ProcessContext context, SmtpConsumer consumer) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
SMTPServer smtpServer = sslContextService == null ? new ConsumerAwareSmtpServer(consumer) : new ConsumerAwareSmtpServer(consumer) {
private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final int port = context.getProperty(SMTP_PORT).asInteger();
final String host = context.getProperty(SMTP_HOSTNAME).getValue();
final ComponentLog log = getLogger();
final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue();
//create message handler factory
final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> {
return new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize);
};
//create smtp server
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SMTPServer smtpServer = sslContextService == null ? new SMTPServer(messageHandlerFactory) : new SMTPServer(messageHandlerFactory) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws IOException {
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
String clientAuth = context.getProperty(CLIENT_AUTH).getValue();
SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuth));
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(),socket.getPort(), true));
SSLSocket sslSocket = (SSLSocket) (socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true));
sslSocket.setUseClientMode(false);
if (SSLContextService.ClientAuth.REQUIRED.toString().equals(clientAuth)) {
@ -348,33 +261,14 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
} else {
smtpServer.setHideTLS(true);
}
smtpServer.setSoftwareName("Apache NiFi SMTP");
smtpServer.setPort(port);
smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
smtpServer.setMaxMessageSize(maxMessageSize);
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
if (context.getProperty(SMTP_HOSTNAME).isSet()) {
smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
}
return smtpServer;
}
/**
* Wrapper over {@link SMTPServer} that is aware of the {@link SmtpConsumer}
* to ensure that its stop() operation is called during server stoppage.
*/
private static class ConsumerAwareSmtpServer extends SMTPServer {
/**
*
*/
public ConsumerAwareSmtpServer(SmtpConsumer consumer) {
super(consumer);
}
/**
*
*/
@Override
public synchronized void stop() {
try {
SmtpConsumer consumer = (SmtpConsumer) this.getMessageHandlerFactory();
consumer.stop();
} finally {
super.stop();
}
}
}
}

View File

@ -1,206 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
/**
* A simple consumer that provides a bridge between 'push' message distribution
* provided by {@link SMTPServer} and NiFi polling scheduler mechanism. It
* implements both {@link MessageHandler} and {@link MessageHandlerFactory}
* allowing it to interact directly with {@link SMTPServer}.
*/
class SmtpConsumer implements MessageHandler, MessageHandlerFactory {
private final static int CONSUMER_STOPPED = -1;
private final static int INTERRUPTED = -2;
private final static int ERROR = -9;
private final static int NO_MESSAGE = -8;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final BlockingQueue<InputStream> messageDataQueue = new ArrayBlockingQueue<>(1);
private final BlockingQueue<Integer> consumedMessageSizeQueue = new ArrayBlockingQueue<>(1);
private final AtomicBoolean running = new AtomicBoolean(true);
private volatile MessageContext messageContext;
private volatile String from;
private volatile String recipient;
/**
*
*/
String getFrom() {
return this.from;
}
/**
*
*/
String getRecipient() {
return this.recipient;
}
/**
*
*/
MessageContext getMessageContext() {
return this.messageContext;
}
/**
* This operation will simply attempt to put a poison message to the
* 'consumedMessageSizeQueue' to ensure that in the event this consumer is
* stopped before the message is consumed (see
* {@link #consumeUsing(Function)}), the server thread that is blocking in
* {@link #data(InputStream)} operation can unblock.
*/
// NOTE: the 'synchronize' is only here for API correctness, to ensure that
// stop() and consumeUsing(..) can never be invoked at the same time.
// However within NiFi it can never happen.
synchronized void stop() {
this.running.compareAndSet(true, false);
this.consumedMessageSizeQueue.offer(CONSUMER_STOPPED);
}
/**
* This operation is invoked by the consumer. Implementation of this
* operation creates a synchronous connection with the message producer (see
* {@link #data(InputStream)}) via a pair of queues which guarantees that
* message is fully consumed and disposed by the consumer (provided as
* {@link Function}) before the server closes the data stream.
*/
// NOTE: the 'synchronize' is only here for API correctness, to ensure that
// stop() and consumeUsing(..) can never be invoked at the same time.
// However within NiFi it can never happen.
synchronized void consumeUsing(Function<InputStream, Integer> resultConsumer) {
int messageSize = 0;
try {
InputStream message = this.messageDataQueue.poll(1000, TimeUnit.MILLISECONDS);
if (message != null) {
messageSize = resultConsumer.apply(message);
} else {
messageSize = NO_MESSAGE;
}
} catch (InterruptedException e) {
this.logger.warn("Current thread is interrupted", e);
messageSize = INTERRUPTED;
Thread.currentThread().interrupt();
} finally {
if (messageSize == 0) {
messageSize = ERROR;
}
if (messageSize != NO_MESSAGE) {
this.consumedMessageSizeQueue.offer(messageSize);
}
}
}
/**
* This operation is invoked by the server thread and contains message data
* as {@link InputStream}. Implementation of this operation creates a
* synchronous connection with consumer (see {@link #onMessage(Function)})
* via a pair of queues which guarantees that message is fully consumed and
* disposed by the consumer by the time this operation exits.
*/
@Override
public void data(InputStream data) throws RejectException, TooMuchDataException, IOException {
if (this.running.get()) {
try {
this.messageDataQueue.offer(data, Integer.MAX_VALUE, TimeUnit.SECONDS);
long messageSize = this.consumedMessageSizeQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
String exceptionMessage = null;
if (messageSize == CONSUMER_STOPPED) {
exceptionMessage = "NIFI Consumer was stopped before message was successfully consumed";
} else if (messageSize == INTERRUPTED) {
exceptionMessage = "Consuming thread was interrupted";
} else if (messageSize == ERROR) {
exceptionMessage = "Consuming thread failed while processing 'data' SMTP event.";
}
if (exceptionMessage != null) {
throw new IllegalStateException(exceptionMessage);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Received message of size: " + messageSize);
}
}
} catch (InterruptedException e) {
this.logger.warn("Current thread is interrupted", e);
Thread.currentThread().interrupt();
throw new IllegalStateException("Current thread is interrupted", e);
}
} else {
throw new IllegalStateException("NIFI Consumer was stopped before message was successfully consumed");
}
}
/**
*
*/
@Override
public void from(String from) throws RejectException {
this.from = from;
}
/**
*
*/
@Override
public void recipient(String recipient) throws RejectException {
this.recipient = recipient;
}
/**
*
*/
@Override
public void done() {
// noop
}
/**
*
*/
@Override
public MessageHandler create(MessageContext ctx) {
this.messageContext = ctx;
return this;
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email.smtp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.email.ListenSMTP;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
/**
* A simple consumer that provides a bridge between 'push' message distribution
* provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
*/
public class SmtpConsumer implements MessageHandler {
private String from = null;
private final List<String> recipientList = new ArrayList<>();
private final MessageContext context;
private final ProcessSessionFactory sessionFactory;
private final int port;
private final int maxMessageSize;
private final ComponentLog log;
private final String host;
public SmtpConsumer(
final MessageContext context,
final ProcessSessionFactory sessionFactory,
final int port,
final String host,
final ComponentLog log,
final int maxMessageSize
) {
this.context = context;
this.sessionFactory = sessionFactory;
this.port = port;
if (host == null || host.trim().isEmpty()) {
this.host = context.getSMTPServer().getHostName();
} else {
this.host = host;
}
this.log = log;
this.maxMessageSize = maxMessageSize;
}
String getFrom() {
return from;
}
List<String> getRecipients() {
return Collections.unmodifiableList(recipientList);
}
@Override
public void data(final InputStream data) throws RejectException, TooMuchDataException, IOException {
final ProcessSession processSession = sessionFactory.createSession();
final StopWatch watch = new StopWatch();
watch.start();
try {
FlowFile flowFile = processSession.create();
final AtomicBoolean limitExceeded = new AtomicBoolean(false);
flowFile = processSession.write(flowFile, (OutputStream out) -> {
final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize);
IOUtils.copy(lis, out);
if (lis.hasReachedLimit()) {
limitExceeded.set(true);
}
});
if (limitExceeded.get()) {
throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages");
}
flowFile = processSession.putAllAttributes(flowFile, extractMessageAttributes());
watch.stop();
processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS));
processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS);
processSession.commit();
} catch (FlowFileAccessException | IllegalStateException | RejectException | IOException ex) {
log.error("Unable to fully process input due to " + ex.getMessage(), ex);
throw ex;
} finally {
processSession.rollback(); //make sure this happens no matter what - is safe
}
}
@Override
public void from(final String from) throws RejectException {
this.from = from;
}
@Override
public void recipient(final String recipient) throws RejectException {
if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) {
recipientList.add(recipient);
}
}
@Override
public void done() {
}
private Map<String, String> extractMessageAttributes() {
final Map<String, String> attributes = new HashMap<>();
final Certificate[] tlsPeerCertificates = context.getTlsPeerCertificates();
if (tlsPeerCertificates != null) {
for (int i = 0; i < tlsPeerCertificates.length; i++) {
if (tlsPeerCertificates[i] instanceof X509Certificate) {
X509Certificate x509Cert = (X509Certificate) tlsPeerCertificates[i];
attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectDN().getName());
}
}
}
attributes.put("smtp.helo", context.getHelo());
attributes.put("smtp.remote.addr", context.getRemoteAddress().toString());
attributes.put("smtp.from", from);
for (int i = 0; i < recipientList.size(); i++) {
attributes.put("smtp.recipient." + i, recipientList.get(i));
}
attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
return attributes;
}
}

View File

@ -1,266 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.SimpleEmail;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.subethamail.smtp.server.SMTPServer;
public class SmtpConsumerTest {
private volatile ExecutorService executor;
@Before
public void before() {
this.executor = Executors.newCachedThreadPool();
}
@After
public void after() {
this.executor.shutdown();
}
@Test
public void validateServerCanStopWhenConsumerStoppedBeforeConsumingMessage() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
CountDownLatch latch = new CountDownLatch(10);
AtomicReference<Exception> exception = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
consumer.data(mock(InputStream.class));
} catch (Exception e) {
e.printStackTrace();
exception.set(e);
} finally {
latch.countDown();
}
}
}
});
boolean finished = latch.await(2000, TimeUnit.MILLISECONDS);
assertFalse(finished);
this.executor.shutdown();
boolean terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertFalse(terminated);
consumer.stop();
finished = latch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(finished);
terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertTrue(terminated);
}
/*
* This test simply validates that consumeUsing(..) can react properly to
* thread interrupts. That said the condition is impossible in the current
* usage of SmtpConsumer
*/
@Test
public void validateServerCanStopWhenConsumerInterrupted() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
AtomicReference<Thread> thread = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
thread.set(Thread.currentThread());
consumer.consumeUsing((in) -> {
return 0;
});
}
});
this.executor.shutdown();
boolean terminated = this.executor.awaitTermination(200, TimeUnit.MILLISECONDS);
assertFalse(terminated); // the call to consumeUsing(..) is blocking on
// the queue.poll since nothing is there
thread.get().interrupt(); // interrupt thread that executes
// consumeUsing(..)
terminated = this.executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertTrue(terminated);
}
/*
* Emulates any errors that may arise while reading the {@link InputStream}
* delivered as part of the data() call.
*/
@Test
public void validateServerCanStopWhenConsumerErrors() throws Exception {
SmtpConsumer consumer = new SmtpConsumer();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
this.executor.execute(new Runnable() {
@Override
public void run() {
try {
consumer.data(mock(InputStream.class));
} catch (Exception e) {
exception.set(e);
} finally {
latch.countDown();
}
}
});
this.executor.execute(new Runnable() {
@Override
public void run() {
consumer.consumeUsing((in) -> {
throw new RuntimeException("intentional");
});
}
});
// this to ensure that call to data unblocks
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(exception.get() instanceof IllegalStateException);
assertEquals("Consuming thread failed while processing 'data' SMTP event.", exception.get().getMessage());
}
@Test
public void validateServerCanStopWithUnconsumedMessage() throws Exception {
int port = NetworkUtils.availablePort();
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtp = new SMTPServer(consumer);
smtp.setPort(port);
smtp.setSoftwareName("Apache NiFi");
smtp.start();
BlockingQueue<Exception> exceptionQueue = new ArrayBlockingQueue<>(1);
this.executor.execute(new Runnable() {
@Override
public void run() {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("Hello SMTP");
email.addTo("bob@nifi.apache.org");
email.send();
} catch (Exception e) {
exceptionQueue.offer(e);
}
}
});
assertNull(exceptionQueue.poll());
smtp.stop();
Exception ex = exceptionQueue.poll(100, TimeUnit.MILLISECONDS);
assertNotNull(ex);
/*
* This essentially ensures and validates that if NiFi was not able to
* successfully consume message the aftermath of the exception thrown by
* the consumer is propagated to the sender essentially ensuring no data
* loss by allowing sender to resend
*/
assertTrue(ex.getMessage().startsWith("Sending the email to the following server failed"));
}
@Test
public void validateConsumer() throws Exception {
int port = NetworkUtils.availablePort();
SmtpConsumer consumer = new SmtpConsumer();
SMTPServer smtp = new SMTPServer(consumer);
smtp.setPort(port);
smtp.setSoftwareName("Apache NiFi");
smtp.start();
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
email.setSmtpPort(port);
email.setFrom("alice@nifi.apache.org");
email.setSubject("This is a test");
email.setMsg("MSG-" + i);
email.addTo("bob@nifi.apache.org");
email.send();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
});
List<String> messages = new ArrayList<>();
for (AtomicInteger i = new AtomicInteger(); i.get() < messageCount;) {
consumer.consumeUsing((dataInputStream) -> {
i.incrementAndGet();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int size = 0;
try {
size = IOUtils.copy(dataInputStream, bos);
messages.add(new String(bos.toByteArray(), StandardCharsets.UTF_8));
return size;
} catch (Exception e) {
e.printStackTrace();
fail();
}
return size;
});
}
boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
assertTrue(complete);
assertTrue(messages.size() == messageCount);
assertTrue(messages.get(0).contains("MSG-0"));
assertTrue(messages.get(1).contains("MSG-1"));
assertTrue(messages.get(2).contains("MSG-2"));
assertTrue(messages.get(3).contains("MSG-3"));
assertTrue(messages.get(4).contains("MSG-4"));
smtp.stop();
}
}

View File

@ -39,25 +39,16 @@ public class TestListenSMTP {
private ScheduledExecutorService executor;
/**
*
*/
@Before
public void before() {
this.executor = Executors.newScheduledThreadPool(2);
}
/**
*
*/
@After
public void after() {
this.executor.shutdown();
}
/**
*
*/
@Test
public void validateSuccessfulInteraction() throws Exception, EmailException {
int port = NetworkUtils.availablePort();
@ -68,21 +59,14 @@ public class TestListenSMTP {
runner.setProperty(ListenSMTP.SMTP_TIMEOUT, "10 seconds");
runner.assertValid();
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
runner.run(5, false);
final int numMessages = 5;
CountDownLatch latch = new CountDownLatch(numMessages);
this.executor.schedule(new Runnable() {
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
for (int i = 0; i < numMessages; i++) {
try {
Email email = new SimpleEmail();
email.setHostName("localhost");
@ -100,17 +84,14 @@ public class TestListenSMTP {
}
}
}
}, 1000, TimeUnit.MILLISECONDS);
}, 1500, TimeUnit.MILLISECONDS);
boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", 5);
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, numMessages);
}
/**
*
*/
@Test
public void validateSuccessfulInteractionWithTls() throws Exception, EmailException {
System.setProperty("mail.smtp.ssl.trust", "*");
@ -134,7 +115,6 @@ public class TestListenSMTP {
runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS");
runner.enableControllerService(sslContextService);
// and add the SSL context to the runner
runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(ListenSMTP.CLIENT_AUTH, SSLContextService.ClientAuth.NONE.name());
@ -142,13 +122,7 @@ public class TestListenSMTP {
int messageCount = 5;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
runner.run(messageCount, false);
this.executor.schedule(new Runnable() {
@Override
@ -181,12 +155,9 @@ public class TestListenSMTP {
boolean complete = latch.await(5000, TimeUnit.MILLISECONDS);
runner.shutdown();
assertTrue(complete);
runner.assertAllFlowFilesTransferred("success", 5);
runner.assertAllFlowFilesTransferred("success", messageCount);
}
/**
*
*/
@Test
public void validateTooLargeMessage() throws Exception, EmailException {
int port = NetworkUtils.availablePort();
@ -202,12 +173,7 @@ public class TestListenSMTP {
int messageCount = 1;
CountDownLatch latch = new CountDownLatch(messageCount);
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
runner.run(1, false);
}
}, 0, 500, TimeUnit.MILLISECONDS);
runner.run(messageCount, false);
this.executor.schedule(new Runnable() {
@Override