From 678342089771adbaedcb6921afd17184367109d2 Mon Sep 17 00:00:00 2001 From: Joseph Witt Date: Thu, 4 Jul 2024 19:40:59 -0700 Subject: [PATCH] NIFI-13513 Removed ListenSMTP This closes #9047 Signed-off-by: David Handermann --- .../nifi-email-processors/pom.xml | 5 - .../nifi/processors/email/ListenSMTP.java | 232 ------------------ .../processors/email/smtp/SmtpConsumer.java | 163 ------------ .../org.apache.nifi.processor.Processor | 1 - .../nifi/processors/email/TestListenSMTP.java | 205 ---------------- 5 files changed, 606 deletions(-) delete mode 100644 nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java delete mode 100644 nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java delete mode 100644 nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java diff --git a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index a8257aeb8d..4630915792 100644 --- a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -64,11 +64,6 @@ angus-mail 2.0.3 - - com.github.davidmoten - subethasmtp - 7.1.1 - org.springframework.integration spring-integration-mail diff --git a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java deleted file mode 100644 index 1d388d046c..0000000000 --- a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ListenSMTP.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.email.smtp.SmtpConsumer; -import org.apache.nifi.security.util.ClientAuth; -import org.apache.nifi.ssl.RestrictedSSLContextService; -import org.apache.nifi.ssl.SSLContextService; -import org.subethamail.smtp.MessageContext; -import org.subethamail.smtp.MessageHandlerFactory; -import org.subethamail.smtp.server.SMTPServer; - -import javax.net.ssl.SSLContext; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -@Tags({"listen", "email", "smtp"}) -@TriggerSerially -@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, " - + "allowing nifi to listen for incoming email. Note this server does not perform any email " - + "validation. If direct exposure to the internet is sought, it may be a better idea to use " - + "the combination of NiFi and an industrial scale MTA (e.g. Postfix). Threading for this " - + "processor is managed by the underlying smtp server used so the processor need not support " - + "more than one thread.") -@WritesAttributes({ - @WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"), - @WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the " - + "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the " - + "certificates used by an TLS peer"), - @WritesAttribute(attribute = "smtp.src", description = "The source IP and port of the SMTP connection"), - @WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"), - @WritesAttribute(attribute = "smtp.recipient.*", description = "The values used during RCPT TO (i.e. envelope)"), - @WritesAttribute(attribute = "mime.type", description = "Mime type of the message")}) -public class ListenSMTP extends AbstractSessionFactoryProcessor { - - static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder() - .name("SMTP_PORT") - .displayName("Listening Port") - .description("The TCP port the ListenSMTP processor will bind to." - + "NOTE that on Unix derivative operating systems this port must " - + "be higher than 1024 unless NiFi is running as with root user permissions.") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.PORT_VALIDATOR) - .build(); - - static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder() - .name("SMTP_MAXIMUM_CONNECTIONS") - .displayName("Maximum number of SMTP connection") - .description("The maximum number of simultaneous SMTP connections.") - .required(true) - .defaultValue("1") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .build(); - - static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder() - .name("SMTP_TIMEOUT") - .displayName("SMTP connection timeout") - .description("The maximum time to wait for an action of SMTP client.") - .defaultValue("60 seconds") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - - static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder() - .name("SMTP_MAXIMUM_MSG_SIZE") - .displayName("SMTP Maximum Message Size") - .description("The maximum number of bytes the server will accept.") - .required(true) - .defaultValue("20 MB") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Integer.MAX_VALUE)) - .build(); - - static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL_CONTEXT_SERVICE") - .displayName("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " - + "messages will be received over a secure connection.") - .required(false) - .identifiesControllerService(RestrictedSSLContextService.class) - .build(); - - static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("CLIENT_AUTH") - .displayName("Client Auth") - .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") - .required(true) - .allowableValues(ClientAuth.NONE.name(), ClientAuth.REQUIRED.name()) - .dependsOn(SSL_CONTEXT_SERVICE) - .build(); - - protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder() - .name("SMTP_HOSTNAME") - .displayName("SMTP hostname") - .description("The hostname to be embedded into the banner displayed when an " - + "SMTP client connects to the processor TCP port .") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All new messages will be routed as FlowFiles to this relationship") - .build(); - - private final static List PROPERTY_DESCRIPTORS = List.of( - SMTP_PORT, - SMTP_MAXIMUM_CONNECTIONS, - SMTP_TIMEOUT, - SMTP_MAXIMUM_MSG_SIZE, - SSL_CONTEXT_SERVICE, - CLIENT_AUTH, - SMTP_HOSTNAME - ); - - private final static Set RELATIONSHIPS = Set.of(REL_SUCCESS); - - private volatile SMTPServer smtp; - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - if (smtp == null) { - try { - final SMTPServer server = prepareServer(context, sessionFactory); - server.start(); - getLogger().info("Started SMTP Server on {}", server.getPortAllocated()); - smtp = server; - } catch (final Exception e) { //have to catch exception due to awkward exception handling in subethasmtp - smtp = null; - getLogger().error("Unable to start SMTP server", e); - } - } - context.yield(); //nothing really to do here since threading managed by smtp server sessions - } - - public int getListeningPort() { - return smtp == null ? 0 : smtp.getPortAllocated(); - } - - @OnStopped - public void stop() { - try { - smtp.stop(); - } catch (final Exception e) { - getLogger().error("Failed to stop SMTP Server", e); - } finally { - smtp = null; - } - } - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTY_DESCRIPTORS; - } - - private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) { - final SMTPServer.Builder smtpServerBuilder = new SMTPServer.Builder(); - - final int port = context.getProperty(SMTP_PORT).asInteger(); - final String host = context.getProperty(SMTP_HOSTNAME).getValue(); - final ComponentLog log = getLogger(); - final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue(); - final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize); - - smtpServerBuilder.messageHandlerFactory(messageHandlerFactory); - smtpServerBuilder.port(port); - smtpServerBuilder.softwareName("Apache NiFi SMTP"); - smtpServerBuilder.maxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger()); - smtpServerBuilder.maxMessageSize(maxMessageSize); - smtpServerBuilder.connectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); - if (context.getProperty(SMTP_HOSTNAME).isSet()) { - smtpServerBuilder.hostName(context.getProperty(SMTP_HOSTNAME).getValue()); - } - - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService == null) { - smtpServerBuilder.hideTLS(); - } else { - smtpServerBuilder.enableTLS(); - - final String clientAuth = context.getProperty(CLIENT_AUTH).getValue(); - final boolean requireClientCertificate = ClientAuth.REQUIRED.getType().equalsIgnoreCase(clientAuth); - - final SSLContext sslContext = sslContextService.createContext(); - smtpServerBuilder.startTlsSocketFactory(sslContext, requireClientCertificate); - } - - return smtpServerBuilder.build(); - } -} diff --git a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java deleted file mode 100644 index 7d0aa76f3a..0000000000 --- a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email.smtp; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.cert.Certificate; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.exception.FlowFileAccessException; -import org.apache.nifi.processors.email.ListenSMTP; -import org.apache.nifi.stream.io.LimitingInputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.util.StopWatch; -import org.subethamail.smtp.MessageContext; -import org.subethamail.smtp.MessageHandler; -import org.subethamail.smtp.TooMuchDataException; -import org.subethamail.smtp.server.SMTPServer; - -import javax.security.auth.x500.X500Principal; - -/** - * A simple consumer that provides a bridge between 'push' message distribution - * provided by {@link SMTPServer} and NiFi polling scheduler mechanism. - */ -public class SmtpConsumer implements MessageHandler { - - private String from = null; - private final List recipientList = new ArrayList<>(); - private final MessageContext context; - private final ProcessSessionFactory sessionFactory; - private final int port; - private final int maxMessageSize; - private final ComponentLog log; - private final String host; - - public SmtpConsumer( - final MessageContext context, - final ProcessSessionFactory sessionFactory, - final int port, - final String host, - final ComponentLog log, - final int maxMessageSize - ) { - this.context = context; - this.sessionFactory = sessionFactory; - this.port = port; - if (host == null || host.trim().isEmpty()) { - this.host = context.getSMTPServer().getHostName(); - } else { - this.host = host; - } - this.log = log; - this.maxMessageSize = maxMessageSize; - } - - @Override - public String data(final InputStream data) throws IOException { - final ProcessSession processSession = sessionFactory.createSession(); - final StopWatch watch = new StopWatch(); - watch.start(); - try { - FlowFile flowFile = processSession.create(); - final AtomicBoolean limitExceeded = new AtomicBoolean(false); - flowFile = processSession.write(flowFile, (OutputStream out) -> { - final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize); - StreamUtils.copy(lis, out); - if (lis.hasReachedLimit()) { - limitExceeded.set(true); - } - }); - if (limitExceeded.get()) { - throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages"); - } - flowFile = processSession.putAllAttributes(flowFile, extractMessageAttributes()); - watch.stop(); - processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS)); - processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS); - processSession.commitAsync(); - } catch (final FlowFileAccessException | IllegalStateException | IOException e) { - log.error("SMTP data processing failed", e); - throw e; - } - - return null; - } - - @Override - public void from(final String from) { - this.from = from; - } - - @Override - public void recipient(final String recipient) { - if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) { - recipientList.add(recipient); - } - } - - @Override - public void done() { - } - - private Map extractMessageAttributes() { - final Map attributes = new HashMap<>(); - final Certificate[] tlsPeerCertificates = context.getTlsPeerCertificates(); - if (tlsPeerCertificates != null) { - for (int i = 0; i < tlsPeerCertificates.length; i++) { - if (tlsPeerCertificates[i] instanceof final X509Certificate x509Cert) { - attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString()); - attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectX500Principal().getName(X500Principal.RFC1779)); - } - } - } - - SocketAddress address = context.getRemoteAddress(); - if (address != null) { - // will extract and format source address if available - String strAddress = address instanceof InetSocketAddress - ? ((InetSocketAddress) address).getHostString() + ":" + ((InetSocketAddress) address).getPort() - : context.getRemoteAddress().toString(); - attributes.put("smtp.src", strAddress); - } - - final Optional helo = context.getHelo(); - helo.ifPresent(s -> attributes.put("smtp.helo", s)); - - attributes.put("smtp.from", from); - for (int i = 0; i < recipientList.size(); i++) { - attributes.put("smtp.recipient." + i, recipientList.get(i)); - } - attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822"); - return attributes; - } -} diff --git a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index c6bf6d8ab6..32ff053fd7 100644 --- a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,6 +14,5 @@ # limitations under the License. org.apache.nifi.processors.email.ExtractEmailAttachments org.apache.nifi.processors.email.ExtractEmailHeaders -org.apache.nifi.processors.email.ListenSMTP org.apache.nifi.processors.email.ConsumeIMAP org.apache.nifi.processors.email.ConsumePOP3 diff --git a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java b/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java deleted file mode 100644 index 82e124811d..0000000000 --- a/nifi-extension-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/TestListenSMTP.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.email; - -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.security.util.ClientAuth; -import org.apache.nifi.security.util.SslContextFactory; -import org.apache.nifi.security.util.StandardTlsConfiguration; -import org.apache.nifi.security.util.TemporaryKeyStoreBuilder; -import org.apache.nifi.security.util.TlsConfiguration; -import org.apache.nifi.ssl.RestrictedSSLContextService; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import jakarta.mail.Message; -import jakarta.mail.MessagingException; -import jakarta.mail.Session; -import jakarta.mail.Transport; -import jakarta.mail.internet.InternetAddress; -import jakarta.mail.internet.MimeMessage; -import javax.net.ssl.SSLContext; -import java.net.Socket; -import java.security.GeneralSecurityException; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestListenSMTP { - private static final String SSL_SERVICE_IDENTIFIER = "ssl-context"; - - private static TlsConfiguration tlsConfiguration; - - private static SSLContextService sslContextService; - - private static final int MESSAGES = 2; - - @BeforeAll - public static void setTlsConfiguration() throws GeneralSecurityException { - final TlsConfiguration testTlsConfiguration = new TemporaryKeyStoreBuilder().build(); - - tlsConfiguration = new StandardTlsConfiguration( - testTlsConfiguration.getKeystorePath(), - testTlsConfiguration.getKeystorePassword(), - testTlsConfiguration.getKeyPassword(), - testTlsConfiguration.getKeystoreType(), - testTlsConfiguration.getTruststorePath(), - testTlsConfiguration.getTruststorePassword(), - testTlsConfiguration.getTruststoreType() - ); - - final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration); - sslContextService = mock(RestrictedSSLContextService.class); - when(sslContextService.getIdentifier()).thenReturn(SSL_SERVICE_IDENTIFIER); - when(sslContextService.createContext()).thenReturn(sslContext); - - - when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration); - } - - @Test - public void testListenSMTP() throws MessagingException, InterruptedException { - final TestRunner runner = newTestRunner(); - - runner.run(1, false); - final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort(); - assertPortListening(port); - - final Session session = getSession(port); - for (int i = 0; i < MESSAGES; i++) { - sendMessage(session, i); - } - - runner.shutdown(); - runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, MESSAGES); - } - - @Test - public void testListenSMTPwithTLSCurrentVersion() throws Exception { - final TestRunner runner = newTestRunner(); - - configureSslContextService(runner); - runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, SSL_SERVICE_IDENTIFIER); - runner.setProperty(ListenSMTP.CLIENT_AUTH, ClientAuth.NONE.name()); - runner.assertValid(); - - runner.run(1, false); - - final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort(); - assertPortListening(port); - final Session session = getSessionTls(port, tlsConfiguration.getProtocol()); - - for (int i = 0; i < MESSAGES; i++) { - sendMessage(session, i); - } - - runner.shutdown(); - runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, MESSAGES); - } - - @Test - public void testListenSMTPwithTooLargeMessage() throws InterruptedException { - final TestRunner runner = newTestRunner(); - - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B"); - - runner.run(1, false); - - final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort(); - assertPortListening(port); - - final Session session = getSession(port); - assertThrows(MessagingException.class, () -> sendMessage(session, 0)); - - runner.shutdown(); - runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, 0); - } - - private TestRunner newTestRunner() { - final ListenSMTP processor = new ListenSMTP(); - final TestRunner runner = TestRunners.newTestRunner(processor); - - runner.setProperty(ListenSMTP.SMTP_PORT, "0"); - runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3"); - return runner; - } - - private void assertPortListening(final int port) throws InterruptedException { - final long endTime = System.currentTimeMillis() + 5_000L; - while (System.currentTimeMillis() <= endTime) { - try (final Socket socket = new Socket("localhost", port)) { - if (socket.isConnected()) { - return; - } - } catch (final Exception e) { - Thread.sleep(10L); - } - } - - Assertions.fail(String.format("expected server listening on %s:%d", "localhost", port)); - } - - private Session getSession(final int port) { - final Properties config = new Properties(); - config.put("mail.smtp.host", "localhost"); - config.put("mail.smtp.port", String.valueOf(port)); - config.put("mail.smtp.connectiontimeout", "5000"); - config.put("mail.smtp.timeout", "5000"); - config.put("mail.smtp.writetimeout", "5000"); - final Session session = Session.getInstance(config); - session.setDebug(true); - return session; - } - - private Session getSessionTls(final int port, final String tlsProtocol) { - final Properties config = new Properties(); - config.put("mail.smtp.host", "localhost"); - config.put("mail.smtp.port", String.valueOf(port)); - config.put("mail.smtp.auth", "false"); - config.put("mail.smtp.starttls.enable", "true"); - config.put("mail.smtp.starttls.required", "true"); - config.put("mail.smtp.ssl.trust", "*"); - config.put("mail.smtp.connectiontimeout", "5000"); - config.put("mail.smtp.timeout", "5000"); - config.put("mail.smtp.writetimeout", "5000"); - config.put("mail.smtp.ssl.protocols", tlsProtocol); - - final Session session = Session.getInstance(config); - session.setDebug(true); - return session; - } - - private void sendMessage(final Session session, final int i) throws MessagingException { - final Message email = new MimeMessage(session); - email.setFrom(new InternetAddress("alice@nifi.apache.org")); - email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org")); - email.setSubject("This is a test"); - email.setText("MSG-" + i); - Transport.send(email); - } - - private void configureSslContextService(final TestRunner runner) throws InitializationException { - runner.addControllerService(SSL_SERVICE_IDENTIFIER, sslContextService); - runner.enableControllerService(sslContextService); - } -}