mirror of https://github.com/apache/nifi.git
NIFI-13513 Removed ListenSMTP
This closes #9047 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
f1fb106bf8
commit
6783420897
|
@ -64,11 +64,6 @@
|
||||||
<artifactId>angus-mail</artifactId>
|
<artifactId>angus-mail</artifactId>
|
||||||
<version>2.0.3</version>
|
<version>2.0.3</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>com.github.davidmoten</groupId>
|
|
||||||
<artifactId>subethasmtp</artifactId>
|
|
||||||
<version>7.1.1</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.integration</groupId>
|
<groupId>org.springframework.integration</groupId>
|
||||||
<artifactId>spring-integration-mail</artifactId>
|
<artifactId>spring-integration-mail</artifactId>
|
||||||
|
|
|
@ -1,232 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.email;
|
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
||||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
|
||||||
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
|
||||||
import org.apache.nifi.processor.DataUnit;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.apache.nifi.processors.email.smtp.SmtpConsumer;
|
|
||||||
import org.apache.nifi.security.util.ClientAuth;
|
|
||||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
|
||||||
import org.subethamail.smtp.MessageContext;
|
|
||||||
import org.subethamail.smtp.MessageHandlerFactory;
|
|
||||||
import org.subethamail.smtp.server.SMTPServer;
|
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
@Tags({"listen", "email", "smtp"})
|
|
||||||
@TriggerSerially
|
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
|
||||||
@CapabilityDescription("This processor implements a lightweight SMTP server to an arbitrary port, "
|
|
||||||
+ "allowing nifi to listen for incoming email. Note this server does not perform any email "
|
|
||||||
+ "validation. If direct exposure to the internet is sought, it may be a better idea to use "
|
|
||||||
+ "the combination of NiFi and an industrial scale MTA (e.g. Postfix). Threading for this "
|
|
||||||
+ "processor is managed by the underlying smtp server used so the processor need not support "
|
|
||||||
+ "more than one thread.")
|
|
||||||
@WritesAttributes({
|
|
||||||
@WritesAttribute(attribute = "smtp.helo", description = "The value used during HELO"),
|
|
||||||
@WritesAttribute(attribute = "smtp.certificates.*.serial", description = "The serial numbers for each of the "
|
|
||||||
+ "certificates used by an TLS peer"),
|
|
||||||
@WritesAttribute(attribute = "smtp.certificates.*.principal", description = "The principal for each of the "
|
|
||||||
+ "certificates used by an TLS peer"),
|
|
||||||
@WritesAttribute(attribute = "smtp.src", description = "The source IP and port of the SMTP connection"),
|
|
||||||
@WritesAttribute(attribute = "smtp.from", description = "The value used during MAIL FROM (i.e. envelope)"),
|
|
||||||
@WritesAttribute(attribute = "smtp.recipient.*", description = "The values used during RCPT TO (i.e. envelope)"),
|
|
||||||
@WritesAttribute(attribute = "mime.type", description = "Mime type of the message")})
|
|
||||||
public class ListenSMTP extends AbstractSessionFactoryProcessor {
|
|
||||||
|
|
||||||
static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
|
|
||||||
.name("SMTP_PORT")
|
|
||||||
.displayName("Listening Port")
|
|
||||||
.description("The TCP port the ListenSMTP processor will bind to."
|
|
||||||
+ "NOTE that on Unix derivative operating systems this port must "
|
|
||||||
+ "be higher than 1024 unless NiFi is running as with root user permissions.")
|
|
||||||
.required(true)
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor SMTP_MAXIMUM_CONNECTIONS = new PropertyDescriptor.Builder()
|
|
||||||
.name("SMTP_MAXIMUM_CONNECTIONS")
|
|
||||||
.displayName("Maximum number of SMTP connection")
|
|
||||||
.description("The maximum number of simultaneous SMTP connections.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("1")
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor SMTP_TIMEOUT = new PropertyDescriptor.Builder()
|
|
||||||
.name("SMTP_TIMEOUT")
|
|
||||||
.displayName("SMTP connection timeout")
|
|
||||||
.description("The maximum time to wait for an action of SMTP client.")
|
|
||||||
.defaultValue("60 seconds")
|
|
||||||
.required(true)
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor SMTP_MAXIMUM_MSG_SIZE = new PropertyDescriptor.Builder()
|
|
||||||
.name("SMTP_MAXIMUM_MSG_SIZE")
|
|
||||||
.displayName("SMTP Maximum Message Size")
|
|
||||||
.description("The maximum number of bytes the server will accept.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("20 MB")
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Integer.MAX_VALUE))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
|
||||||
.name("SSL_CONTEXT_SERVICE")
|
|
||||||
.displayName("SSL Context Service")
|
|
||||||
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, "
|
|
||||||
+ "messages will be received over a secure connection.")
|
|
||||||
.required(false)
|
|
||||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
|
|
||||||
.name("CLIENT_AUTH")
|
|
||||||
.displayName("Client Auth")
|
|
||||||
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
|
|
||||||
.required(true)
|
|
||||||
.allowableValues(ClientAuth.NONE.name(), ClientAuth.REQUIRED.name())
|
|
||||||
.dependsOn(SSL_CONTEXT_SERVICE)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
|
|
||||||
.name("SMTP_HOSTNAME")
|
|
||||||
.displayName("SMTP hostname")
|
|
||||||
.description("The hostname to be embedded into the banner displayed when an "
|
|
||||||
+ "SMTP client connects to the processor TCP port .")
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
|
||||||
.name("success")
|
|
||||||
.description("All new messages will be routed as FlowFiles to this relationship")
|
|
||||||
.build();
|
|
||||||
|
|
||||||
private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
|
|
||||||
SMTP_PORT,
|
|
||||||
SMTP_MAXIMUM_CONNECTIONS,
|
|
||||||
SMTP_TIMEOUT,
|
|
||||||
SMTP_MAXIMUM_MSG_SIZE,
|
|
||||||
SSL_CONTEXT_SERVICE,
|
|
||||||
CLIENT_AUTH,
|
|
||||||
SMTP_HOSTNAME
|
|
||||||
);
|
|
||||||
|
|
||||||
private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
|
|
||||||
|
|
||||||
private volatile SMTPServer smtp;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
|
|
||||||
if (smtp == null) {
|
|
||||||
try {
|
|
||||||
final SMTPServer server = prepareServer(context, sessionFactory);
|
|
||||||
server.start();
|
|
||||||
getLogger().info("Started SMTP Server on {}", server.getPortAllocated());
|
|
||||||
smtp = server;
|
|
||||||
} catch (final Exception e) { //have to catch exception due to awkward exception handling in subethasmtp
|
|
||||||
smtp = null;
|
|
||||||
getLogger().error("Unable to start SMTP server", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
context.yield(); //nothing really to do here since threading managed by smtp server sessions
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getListeningPort() {
|
|
||||||
return smtp == null ? 0 : smtp.getPortAllocated();
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnStopped
|
|
||||||
public void stop() {
|
|
||||||
try {
|
|
||||||
smtp.stop();
|
|
||||||
} catch (final Exception e) {
|
|
||||||
getLogger().error("Failed to stop SMTP Server", e);
|
|
||||||
} finally {
|
|
||||||
smtp = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<Relationship> getRelationships() {
|
|
||||||
return RELATIONSHIPS;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
|
||||||
return PROPERTY_DESCRIPTORS;
|
|
||||||
}
|
|
||||||
|
|
||||||
private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
|
|
||||||
final SMTPServer.Builder smtpServerBuilder = new SMTPServer.Builder();
|
|
||||||
|
|
||||||
final int port = context.getProperty(SMTP_PORT).asInteger();
|
|
||||||
final String host = context.getProperty(SMTP_HOSTNAME).getValue();
|
|
||||||
final ComponentLog log = getLogger();
|
|
||||||
final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue();
|
|
||||||
final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize);
|
|
||||||
|
|
||||||
smtpServerBuilder.messageHandlerFactory(messageHandlerFactory);
|
|
||||||
smtpServerBuilder.port(port);
|
|
||||||
smtpServerBuilder.softwareName("Apache NiFi SMTP");
|
|
||||||
smtpServerBuilder.maxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
|
|
||||||
smtpServerBuilder.maxMessageSize(maxMessageSize);
|
|
||||||
smtpServerBuilder.connectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
|
|
||||||
if (context.getProperty(SMTP_HOSTNAME).isSet()) {
|
|
||||||
smtpServerBuilder.hostName(context.getProperty(SMTP_HOSTNAME).getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
|
||||||
if (sslContextService == null) {
|
|
||||||
smtpServerBuilder.hideTLS();
|
|
||||||
} else {
|
|
||||||
smtpServerBuilder.enableTLS();
|
|
||||||
|
|
||||||
final String clientAuth = context.getProperty(CLIENT_AUTH).getValue();
|
|
||||||
final boolean requireClientCertificate = ClientAuth.REQUIRED.getType().equalsIgnoreCase(clientAuth);
|
|
||||||
|
|
||||||
final SSLContext sslContext = sslContextService.createContext();
|
|
||||||
smtpServerBuilder.startTlsSocketFactory(sslContext, requireClientCertificate);
|
|
||||||
}
|
|
||||||
|
|
||||||
return smtpServerBuilder.build();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,163 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.email.smtp;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.security.cert.Certificate;
|
|
||||||
import java.security.cert.X509Certificate;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
|
||||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
|
||||||
import org.apache.nifi.processors.email.ListenSMTP;
|
|
||||||
import org.apache.nifi.stream.io.LimitingInputStream;
|
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
|
||||||
import org.apache.nifi.util.StopWatch;
|
|
||||||
import org.subethamail.smtp.MessageContext;
|
|
||||||
import org.subethamail.smtp.MessageHandler;
|
|
||||||
import org.subethamail.smtp.TooMuchDataException;
|
|
||||||
import org.subethamail.smtp.server.SMTPServer;
|
|
||||||
|
|
||||||
import javax.security.auth.x500.X500Principal;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple consumer that provides a bridge between 'push' message distribution
|
|
||||||
* provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
|
|
||||||
*/
|
|
||||||
public class SmtpConsumer implements MessageHandler {
|
|
||||||
|
|
||||||
private String from = null;
|
|
||||||
private final List<String> recipientList = new ArrayList<>();
|
|
||||||
private final MessageContext context;
|
|
||||||
private final ProcessSessionFactory sessionFactory;
|
|
||||||
private final int port;
|
|
||||||
private final int maxMessageSize;
|
|
||||||
private final ComponentLog log;
|
|
||||||
private final String host;
|
|
||||||
|
|
||||||
public SmtpConsumer(
|
|
||||||
final MessageContext context,
|
|
||||||
final ProcessSessionFactory sessionFactory,
|
|
||||||
final int port,
|
|
||||||
final String host,
|
|
||||||
final ComponentLog log,
|
|
||||||
final int maxMessageSize
|
|
||||||
) {
|
|
||||||
this.context = context;
|
|
||||||
this.sessionFactory = sessionFactory;
|
|
||||||
this.port = port;
|
|
||||||
if (host == null || host.trim().isEmpty()) {
|
|
||||||
this.host = context.getSMTPServer().getHostName();
|
|
||||||
} else {
|
|
||||||
this.host = host;
|
|
||||||
}
|
|
||||||
this.log = log;
|
|
||||||
this.maxMessageSize = maxMessageSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String data(final InputStream data) throws IOException {
|
|
||||||
final ProcessSession processSession = sessionFactory.createSession();
|
|
||||||
final StopWatch watch = new StopWatch();
|
|
||||||
watch.start();
|
|
||||||
try {
|
|
||||||
FlowFile flowFile = processSession.create();
|
|
||||||
final AtomicBoolean limitExceeded = new AtomicBoolean(false);
|
|
||||||
flowFile = processSession.write(flowFile, (OutputStream out) -> {
|
|
||||||
final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize);
|
|
||||||
StreamUtils.copy(lis, out);
|
|
||||||
if (lis.hasReachedLimit()) {
|
|
||||||
limitExceeded.set(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (limitExceeded.get()) {
|
|
||||||
throw new TooMuchDataException("Maximum message size limit reached - client must send smaller messages");
|
|
||||||
}
|
|
||||||
flowFile = processSession.putAllAttributes(flowFile, extractMessageAttributes());
|
|
||||||
watch.stop();
|
|
||||||
processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS));
|
|
||||||
processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS);
|
|
||||||
processSession.commitAsync();
|
|
||||||
} catch (final FlowFileAccessException | IllegalStateException | IOException e) {
|
|
||||||
log.error("SMTP data processing failed", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void from(final String from) {
|
|
||||||
this.from = from;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void recipient(final String recipient) {
|
|
||||||
if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) {
|
|
||||||
recipientList.add(recipient);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void done() {
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, String> extractMessageAttributes() {
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
|
||||||
final Certificate[] tlsPeerCertificates = context.getTlsPeerCertificates();
|
|
||||||
if (tlsPeerCertificates != null) {
|
|
||||||
for (int i = 0; i < tlsPeerCertificates.length; i++) {
|
|
||||||
if (tlsPeerCertificates[i] instanceof final X509Certificate x509Cert) {
|
|
||||||
attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
|
|
||||||
attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectX500Principal().getName(X500Principal.RFC1779));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SocketAddress address = context.getRemoteAddress();
|
|
||||||
if (address != null) {
|
|
||||||
// will extract and format source address if available
|
|
||||||
String strAddress = address instanceof InetSocketAddress
|
|
||||||
? ((InetSocketAddress) address).getHostString() + ":" + ((InetSocketAddress) address).getPort()
|
|
||||||
: context.getRemoteAddress().toString();
|
|
||||||
attributes.put("smtp.src", strAddress);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Optional<String> helo = context.getHelo();
|
|
||||||
helo.ifPresent(s -> attributes.put("smtp.helo", s));
|
|
||||||
|
|
||||||
attributes.put("smtp.from", from);
|
|
||||||
for (int i = 0; i < recipientList.size(); i++) {
|
|
||||||
attributes.put("smtp.recipient." + i, recipientList.get(i));
|
|
||||||
}
|
|
||||||
attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
|
|
||||||
return attributes;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -14,6 +14,5 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.processors.email.ExtractEmailAttachments
|
org.apache.nifi.processors.email.ExtractEmailAttachments
|
||||||
org.apache.nifi.processors.email.ExtractEmailHeaders
|
org.apache.nifi.processors.email.ExtractEmailHeaders
|
||||||
org.apache.nifi.processors.email.ListenSMTP
|
|
||||||
org.apache.nifi.processors.email.ConsumeIMAP
|
org.apache.nifi.processors.email.ConsumeIMAP
|
||||||
org.apache.nifi.processors.email.ConsumePOP3
|
org.apache.nifi.processors.email.ConsumePOP3
|
||||||
|
|
|
@ -1,205 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.email;
|
|
||||||
|
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
|
||||||
import org.apache.nifi.security.util.ClientAuth;
|
|
||||||
import org.apache.nifi.security.util.SslContextFactory;
|
|
||||||
import org.apache.nifi.security.util.StandardTlsConfiguration;
|
|
||||||
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
|
|
||||||
import org.apache.nifi.security.util.TlsConfiguration;
|
|
||||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import jakarta.mail.Message;
|
|
||||||
import jakarta.mail.MessagingException;
|
|
||||||
import jakarta.mail.Session;
|
|
||||||
import jakarta.mail.Transport;
|
|
||||||
import jakarta.mail.internet.InternetAddress;
|
|
||||||
import jakarta.mail.internet.MimeMessage;
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.security.GeneralSecurityException;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
public class TestListenSMTP {
|
|
||||||
private static final String SSL_SERVICE_IDENTIFIER = "ssl-context";
|
|
||||||
|
|
||||||
private static TlsConfiguration tlsConfiguration;
|
|
||||||
|
|
||||||
private static SSLContextService sslContextService;
|
|
||||||
|
|
||||||
private static final int MESSAGES = 2;
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
public static void setTlsConfiguration() throws GeneralSecurityException {
|
|
||||||
final TlsConfiguration testTlsConfiguration = new TemporaryKeyStoreBuilder().build();
|
|
||||||
|
|
||||||
tlsConfiguration = new StandardTlsConfiguration(
|
|
||||||
testTlsConfiguration.getKeystorePath(),
|
|
||||||
testTlsConfiguration.getKeystorePassword(),
|
|
||||||
testTlsConfiguration.getKeyPassword(),
|
|
||||||
testTlsConfiguration.getKeystoreType(),
|
|
||||||
testTlsConfiguration.getTruststorePath(),
|
|
||||||
testTlsConfiguration.getTruststorePassword(),
|
|
||||||
testTlsConfiguration.getTruststoreType()
|
|
||||||
);
|
|
||||||
|
|
||||||
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
|
|
||||||
sslContextService = mock(RestrictedSSLContextService.class);
|
|
||||||
when(sslContextService.getIdentifier()).thenReturn(SSL_SERVICE_IDENTIFIER);
|
|
||||||
when(sslContextService.createContext()).thenReturn(sslContext);
|
|
||||||
|
|
||||||
|
|
||||||
when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testListenSMTP() throws MessagingException, InterruptedException {
|
|
||||||
final TestRunner runner = newTestRunner();
|
|
||||||
|
|
||||||
runner.run(1, false);
|
|
||||||
final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort();
|
|
||||||
assertPortListening(port);
|
|
||||||
|
|
||||||
final Session session = getSession(port);
|
|
||||||
for (int i = 0; i < MESSAGES; i++) {
|
|
||||||
sendMessage(session, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
runner.shutdown();
|
|
||||||
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, MESSAGES);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testListenSMTPwithTLSCurrentVersion() throws Exception {
|
|
||||||
final TestRunner runner = newTestRunner();
|
|
||||||
|
|
||||||
configureSslContextService(runner);
|
|
||||||
runner.setProperty(ListenSMTP.SSL_CONTEXT_SERVICE, SSL_SERVICE_IDENTIFIER);
|
|
||||||
runner.setProperty(ListenSMTP.CLIENT_AUTH, ClientAuth.NONE.name());
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
runner.run(1, false);
|
|
||||||
|
|
||||||
final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort();
|
|
||||||
assertPortListening(port);
|
|
||||||
final Session session = getSessionTls(port, tlsConfiguration.getProtocol());
|
|
||||||
|
|
||||||
for (int i = 0; i < MESSAGES; i++) {
|
|
||||||
sendMessage(session, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
runner.shutdown();
|
|
||||||
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, MESSAGES);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testListenSMTPwithTooLargeMessage() throws InterruptedException {
|
|
||||||
final TestRunner runner = newTestRunner();
|
|
||||||
|
|
||||||
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_MSG_SIZE, "10 B");
|
|
||||||
|
|
||||||
runner.run(1, false);
|
|
||||||
|
|
||||||
final int port = ((ListenSMTP) runner.getProcessor()).getListeningPort();
|
|
||||||
assertPortListening(port);
|
|
||||||
|
|
||||||
final Session session = getSession(port);
|
|
||||||
assertThrows(MessagingException.class, () -> sendMessage(session, 0));
|
|
||||||
|
|
||||||
runner.shutdown();
|
|
||||||
runner.assertAllFlowFilesTransferred(ListenSMTP.REL_SUCCESS, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private TestRunner newTestRunner() {
|
|
||||||
final ListenSMTP processor = new ListenSMTP();
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
|
||||||
|
|
||||||
runner.setProperty(ListenSMTP.SMTP_PORT, "0");
|
|
||||||
runner.setProperty(ListenSMTP.SMTP_MAXIMUM_CONNECTIONS, "3");
|
|
||||||
return runner;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertPortListening(final int port) throws InterruptedException {
|
|
||||||
final long endTime = System.currentTimeMillis() + 5_000L;
|
|
||||||
while (System.currentTimeMillis() <= endTime) {
|
|
||||||
try (final Socket socket = new Socket("localhost", port)) {
|
|
||||||
if (socket.isConnected()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (final Exception e) {
|
|
||||||
Thread.sleep(10L);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Assertions.fail(String.format("expected server listening on %s:%d", "localhost", port));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Session getSession(final int port) {
|
|
||||||
final Properties config = new Properties();
|
|
||||||
config.put("mail.smtp.host", "localhost");
|
|
||||||
config.put("mail.smtp.port", String.valueOf(port));
|
|
||||||
config.put("mail.smtp.connectiontimeout", "5000");
|
|
||||||
config.put("mail.smtp.timeout", "5000");
|
|
||||||
config.put("mail.smtp.writetimeout", "5000");
|
|
||||||
final Session session = Session.getInstance(config);
|
|
||||||
session.setDebug(true);
|
|
||||||
return session;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Session getSessionTls(final int port, final String tlsProtocol) {
|
|
||||||
final Properties config = new Properties();
|
|
||||||
config.put("mail.smtp.host", "localhost");
|
|
||||||
config.put("mail.smtp.port", String.valueOf(port));
|
|
||||||
config.put("mail.smtp.auth", "false");
|
|
||||||
config.put("mail.smtp.starttls.enable", "true");
|
|
||||||
config.put("mail.smtp.starttls.required", "true");
|
|
||||||
config.put("mail.smtp.ssl.trust", "*");
|
|
||||||
config.put("mail.smtp.connectiontimeout", "5000");
|
|
||||||
config.put("mail.smtp.timeout", "5000");
|
|
||||||
config.put("mail.smtp.writetimeout", "5000");
|
|
||||||
config.put("mail.smtp.ssl.protocols", tlsProtocol);
|
|
||||||
|
|
||||||
final Session session = Session.getInstance(config);
|
|
||||||
session.setDebug(true);
|
|
||||||
return session;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessage(final Session session, final int i) throws MessagingException {
|
|
||||||
final Message email = new MimeMessage(session);
|
|
||||||
email.setFrom(new InternetAddress("alice@nifi.apache.org"));
|
|
||||||
email.setRecipients(Message.RecipientType.TO, InternetAddress.parse("bob@nifi.apache.org"));
|
|
||||||
email.setSubject("This is a test");
|
|
||||||
email.setText("MSG-" + i);
|
|
||||||
Transport.send(email);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configureSslContextService(final TestRunner runner) throws InitializationException {
|
|
||||||
runner.addControllerService(SSL_SERVICE_IDENTIFIER, sslContextService);
|
|
||||||
runner.enableControllerService(sslContextService);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue