NIFI-12820 Upgraded Email Processors to Jakarta Mail 2

This closes #8428.

- Upgraded from Java Mail 1.4.7 to Jakarta Mail API 2.1.2
- Upgraded Spring Integration from 5.5.20 to 6.2.1
- Upgraded SubEtha SMTP from 3.1.7 to 7.0.1
- Upgraded Greenmail from 1.6.15 to 2.0.1
- Removed usage of Commons Lang3
- Removed usage of Commons IO

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2024-02-19 20:18:09 -06:00 committed by Joseph Witt
parent fcbd7c690b
commit ff4c187f86
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
15 changed files with 456 additions and 712 deletions

View File

@ -25,7 +25,7 @@
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<properties>
<spring.integration.version>5.5.20</spring.integration.version>
<spring.integration.version>6.2.1</spring.integration.version>
<poi.version>5.2.5</poi.version>
</properties>
@ -37,103 +37,95 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-cert</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.6.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-oauth2-provider-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.angus</groupId>
<artifactId>angus-mail</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>subethasmtp</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>${spring.integration.version}</version>
<exclusions>
<!-- Exclude Spring libraries not required for IMAP and POP3 -->
<exclusion>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</exclusion>
<exclusion>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- poi-scratchpad required for TNEF parsing -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-scratchpad</artifactId>
<version>${poi.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>SparseBitSet</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail</artifactId>
<version>1.6.15</version>
<version>2.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -33,14 +33,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.StaticListableBeanFactory;
import org.springframework.integration.mail.AbstractMailReceiver;
import org.springframework.util.Assert;
import javax.mail.Address;
import javax.mail.Message;
import javax.mail.MessagingException;
import jakarta.mail.Address;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -224,17 +223,11 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
}
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return SHARED_RELATIONSHIPS;
}
/**
*
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
this.initializeIfNecessary(context, processSession);
@ -245,9 +238,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
}
}
/**
*
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@ -278,26 +268,17 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> {
String accessToken = oauth2AccessTokenProvider.getAccessDetails().getAccessToken();
return accessToken;
}).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider ->
oauth2AccessTokenProvider.getAccessDetails().getAccessToken()
).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
StringBuilder urlBuilder = new StringBuilder();
try {
urlBuilder.append(URLEncoder.encode(user, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new ProcessException(e);
}
urlBuilder.append(URLEncoder.encode(user, StandardCharsets.UTF_8));
urlBuilder.append(":");
try {
urlBuilder.append(URLEncoder.encode(password, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new ProcessException(e);
}
urlBuilder.append(URLEncoder.encode(password, StandardCharsets.UTF_8));
urlBuilder.append("@");
urlBuilder.append(host);
urlBuilder.append(":");
@ -306,15 +287,15 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
urlBuilder.append(folder);
String protocol = this.getProtocol(processContext);
String finalUrl = protocol + "://" + urlBuilder.toString();
String finalUrl = protocol + "://" + urlBuilder;
// build display-safe URL
int passwordStartIndex = urlBuilder.indexOf(":") + 1;
int passwordEndIndex = urlBuilder.indexOf("@");
urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
this.displayUrl = protocol + "://" + urlBuilder.toString();
this.displayUrl = protocol + "://" + urlBuilder;
if (this.logger.isInfoEnabled()) {
this.logger.info("Connecting to Email server at the following URL: " + this.displayUrl);
this.logger.info("Connecting to Email server at the following URL: {}", this.displayUrl);
}
return finalUrl;
@ -388,7 +369,6 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
if (messages != null) {
for (Object message : messages) {
Assert.isTrue(message instanceof Message, "Message is not an instance of javax.mail.Message");
this.messageQueue.offer((Message) message);
}
}
@ -446,10 +426,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
}
/**
* Will flush the remaining messages when this processor is stopped. The
* flushed messages are disposed via
* {@link #disposeMessage(Message, ProcessContext, ProcessSession)}
* operation
* Will flush the remaining messages when this processor is stopped.
*/
private void flushRemainingMessages(ProcessContext processContext) {
Message emailMessage;

View File

@ -55,16 +55,12 @@ public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> {
static final List<PropertyDescriptor> DESCRIPTORS;
static {
List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.addAll(SHARED_DESCRIPTORS);
_descriptors.add(SHOULD_MARK_READ);
_descriptors.add(USE_SSL);
DESCRIPTORS = Collections.unmodifiableList(_descriptors);
List<PropertyDescriptor> descriptors = new ArrayList<>(SHARED_DESCRIPTORS);
descriptors.add(SHOULD_MARK_READ);
descriptors.add(USE_SSL);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
}
/**
*
*/
@Override
protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) {
ImapMailReceiver receiver = new ImapMailReceiver(this.buildUrl(processContext));
@ -74,17 +70,11 @@ public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> {
return receiver;
}
/**
*
*/
@Override
protected String getProtocol(ProcessContext processContext) {
return processContext.getProperty(USE_SSL).asBoolean() ? "imaps" : "imap";
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processors.email;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -34,25 +32,11 @@ import org.springframework.integration.mail.Pop3MailReceiver;
@Tags({ "Email", "POP3", "Get", "Ingest", "Ingress", "Message", "Consume" })
public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> {
static final List<PropertyDescriptor> DESCRIPTORS;
static {
List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.addAll(SHARED_DESCRIPTORS);
DESCRIPTORS = Collections.unmodifiableList(_descriptors);
}
/**
*
*/
@Override
protected String getProtocol(ProcessContext processContext) {
return "pop3";
}
/**
*
*/
@Override
protected Pop3MailReceiver buildMessageReceiver(ProcessContext context) {
final Pop3MailReceiver receiver = new Pop3MailReceiver(this.buildUrl(context));
@ -60,11 +44,8 @@ public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> {
return receiver;
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
return SHARED_DESCRIPTORS;
}
}

View File

@ -19,23 +19,22 @@ package org.apache.nifi.processors.email;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.activation.DataSource;
import javax.mail.Address;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import jakarta.activation.DataSource;
import jakarta.mail.Address;
import jakarta.mail.BodyPart;
import jakarta.mail.MessagingException;
import jakarta.mail.Multipart;
import jakarta.mail.Session;
import jakarta.mail.internet.MimeBodyPart;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimePart;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -44,19 +43,15 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@SideEffectFree
@ -83,24 +78,12 @@ public class ExtractEmailAttachments extends AbstractProcessor {
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Flowfiles that could not be parsed")
.description("FlowFiles that could not be parsed")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private static final String ATTACHMENT_DISPOSITION = "attachment";
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ATTACHMENTS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
this.descriptors = Collections.unmodifiableList(descriptors);
}
private static final Set<Relationship> relationships = Set.of(REL_ATTACHMENTS, REL_ORIGINAL, REL_FAILURE);
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
@ -115,65 +98,61 @@ public class ExtractEmailAttachments extends AbstractProcessor {
final String requireStrictAddresses = "false";
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
originalFlowFilesList.add(originalFlowFile);
if (parser.hasAttachments()) {
final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
try {
for (final DataSource data : parser.getAttachmentList()) {
FlowFile split = session.create(originalFlowFile);
final Map<String, String> attributes = new HashMap<>();
if (StringUtils.isNotBlank(data.getName())) {
attributes.put(CoreAttributes.FILENAME.key(), data.getName());
}
if (StringUtils.isNotBlank(data.getContentType())) {
attributes.put(CoreAttributes.MIME_TYPE.key(), data.getContentType());
}
String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
split = session.append(split, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
IOUtils.copy(data.getInputStream(), out);
}
});
split = session.putAllAttributes(split, attributes);
attachmentsList.add(split);
}
} catch (FlowFileHandlingException e) {
// Something went wrong
// Removing splits that may have been created
session.remove(attachmentsList);
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e});
invalidFlowFilesList.add(originalFlowFile);
}
}
} catch (Exception e) {
// Another error hit...
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
// Message is invalid or triggered an error during parsing
invalidFlowFilesList.add(originalFlowFile);
}
session.read(originalFlowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
originalFlowFilesList.add(originalFlowFile);
final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
try {
final List<DataSource> attachments = new ArrayList<>();
parseAttachments(attachments, originalMessage, 0);
for (final DataSource data : attachments) {
FlowFile split = session.create(originalFlowFile);
final Map<String, String> attributes = new HashMap<>();
final String name = data.getName();
if (name != null && !name.isBlank()) {
attributes.put(CoreAttributes.FILENAME.key(), name);
}
final String contentType = data.getContentType();
if (contentType != null && !contentType.isBlank()) {
attributes.put(CoreAttributes.MIME_TYPE.key(), contentType);
}
String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
split = session.append(split, out -> StreamUtils.copy(data.getInputStream(), out));
split = session.putAllAttributes(split, attributes);
attachmentsList.add(split);
}
} catch (FlowFileHandlingException e) {
// Something went wrong
// Removing splits that may have been created
session.remove(attachmentsList);
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", originalFlowFile, e);
invalidFlowFilesList.add(originalFlowFile);
}
} catch (Exception e) {
// Another error hit...
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Could not parse the flowfile {} as an email, treating as failure", originalFlowFile, e);
// Message is invalid or triggered an error during parsing
invalidFlowFilesList.add(originalFlowFile);
}
});
session.transfer(attachmentsList, REL_ATTACHMENTS);
@ -184,21 +163,33 @@ public class ExtractEmailAttachments extends AbstractProcessor {
session.transfer(originalFlowFilesList, REL_ORIGINAL);
if (attachmentsList.size() > 10) {
logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()});
} else if (attachmentsList.size() > 1){
logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList});
logger.info("Split {} into {} files", originalFlowFile, attachmentsList.size());
} else if (attachmentsList.size() > 1) {
logger.info("Split {} into {} files: {}", originalFlowFile, attachmentsList.size(), attachmentsList);
}
}
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
private void parseAttachments(final List<DataSource> attachments, final MimePart parentPart, final int depth) throws MessagingException, IOException {
final String disposition = parentPart.getDisposition();
final Object parentContent = parentPart.getContent();
if (parentContent instanceof Multipart multipart) {
final int count = multipart.getCount();
final int partDepth = depth + 1;
for (int i = 0; i < count; i++) {
final BodyPart bodyPart = multipart.getBodyPart(i);
if (bodyPart instanceof MimeBodyPart mimeBodyPart) {
parseAttachments(attachments, mimeBodyPart, partDepth);
}
}
} else if (ATTACHMENT_DISPOSITION.equalsIgnoreCase(disposition) || depth > 0) {
final DataSource dataSource = parentPart.getDataHandler().getDataSource();
attachments.add(dataSource);
}
}
}

View File

@ -16,29 +16,29 @@
*/
package org.apache.nifi.processors.email;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.mail.Address;
import javax.mail.Header;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.MimeMessage;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.mail.Address;
import jakarta.mail.BodyPart;
import jakarta.mail.Header;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import jakarta.mail.Multipart;
import jakarta.mail.Session;
import jakarta.mail.internet.MimeBodyPart;
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimePart;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -55,9 +55,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@ -127,22 +125,11 @@ public class ExtractEmailHeaders extends AbstractProcessor {
.description("Flowfiles that could not be parsed as a RFC-2822 compliant message")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
private static final String ATTACHMENT_DISPOSITION = "attachment";
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
private static final Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CAPTURED_HEADERS);
descriptors.add(STRICT_PARSING);
this.descriptors = Collections.unmodifiableList(descriptors);
}
private static final List<PropertyDescriptor> descriptors = List.of(CAPTURED_HEADERS, STRICT_PARSING);
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
@ -160,68 +147,64 @@ public class ExtractEmailHeaders extends AbstractProcessor {
final List<String> capturedHeadersList = Arrays.asList(context.getProperty(CAPTURED_HEADERS).getValue().toLowerCase().split(":"));
final Map<String, String> attributes = new HashMap<>();
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
MimeMessageParser parser = new MimeMessageParser(originalMessage).parse();
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
if (capturedHeadersList.size() > 0){
Enumeration headers = originalMessage.getAllHeaders();
while (headers.hasMoreElements()) {
Header header = (Header) headers.nextElement();
if (StringUtils.isNotEmpty(header.getValue())
&& capturedHeadersList.contains(header.getName().toLowerCase())) {
attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue());
}
session.read(originalFlowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
props.put("mail.mime.address.strict", requireStrictAddresses);
Session mailSession = Session.getInstance(props);
MimeMessage originalMessage = new MimeMessage(mailSession, in);
// RFC-2822 determines that a message must have a "From:" header
// if a message lacks the field, it is flagged as invalid
Address[] from = originalMessage.getFrom();
if (from == null) {
throw new MessagingException("Message failed RFC-2822 validation: No Sender");
}
if (!capturedHeadersList.isEmpty()) {
Enumeration headers = originalMessage.getAllHeaders();
while (headers.hasMoreElements()) {
final Header header = (Header) headers.nextElement();
final String headerValue = header.getValue();
if (headerValue != null && !headerValue.isBlank()
&& capturedHeadersList.contains(header.getName().toLowerCase())) {
attributes.put("email.headers." + header.getName().toLowerCase(), header.getValue());
}
}
putAddressListInAttributes(attributes, EMAIL_HEADER_TO, originalMessage.getRecipients(Message.RecipientType.TO));
putAddressListInAttributes(attributes, EMAIL_HEADER_CC, originalMessage.getRecipients(Message.RecipientType.CC));
putAddressListInAttributes(attributes, EMAIL_HEADER_BCC, originalMessage.getRecipients(Message.RecipientType.BCC));
putAddressListInAttributes(attributes, EMAIL_HEADER_FROM, originalMessage.getFrom()); // RFC-2822 specifies "From" as mailbox-list
if (StringUtils.isNotEmpty(originalMessage.getMessageID())) {
attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID());
}
if (originalMessage.getReceivedDate() != null) {
attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString());
}
if (originalMessage.getSentDate() != null) {
attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString());
}
if (StringUtils.isNotEmpty(originalMessage.getSubject())) {
attributes.put(EMAIL_HEADER_SUBJECT, originalMessage.getSubject());
}
// Zeroes EMAIL_ATTACHMENT_COUNT
attributes.put(EMAIL_ATTACHMENT_COUNT, "0");
// But insert correct value if attachments are present
if (parser.hasAttachments()) {
attributes.put(EMAIL_ATTACHMENT_COUNT, String.valueOf(parser.getAttachmentList().size()));
}
} catch (Exception e) {
// Message is invalid or triggered an error during parsing
attributes.clear();
logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
invalidFlowFilesList.add(originalFlowFile);
}
putAddressListInAttributes(attributes, EMAIL_HEADER_TO, originalMessage.getRecipients(Message.RecipientType.TO));
putAddressListInAttributes(attributes, EMAIL_HEADER_CC, originalMessage.getRecipients(Message.RecipientType.CC));
putAddressListInAttributes(attributes, EMAIL_HEADER_BCC, originalMessage.getRecipients(Message.RecipientType.BCC));
putAddressListInAttributes(attributes, EMAIL_HEADER_FROM, originalMessage.getFrom()); // RFC-2822 specifies "From" as mailbox-list
final String messageId = originalMessage.getMessageID();
if (messageId != null && !messageId.isEmpty()) {
attributes.put(EMAIL_HEADER_MESSAGE_ID, originalMessage.getMessageID());
}
if (originalMessage.getReceivedDate() != null) {
attributes.put(EMAIL_HEADER_RECV_DATE, originalMessage.getReceivedDate().toString());
}
if (originalMessage.getSentDate() != null) {
attributes.put(EMAIL_HEADER_SENT_DATE, originalMessage.getSentDate().toString());
}
final String subject = originalMessage.getSubject();
if (subject != null && !subject.isEmpty()) {
attributes.put(EMAIL_HEADER_SUBJECT, subject);
}
final AtomicInteger attachmentsCounter = new AtomicInteger();
countAttachments(attachmentsCounter, originalMessage, 0);
attributes.put(EMAIL_ATTACHMENT_COUNT, Integer.toString(attachmentsCounter.get()));
} catch (Exception e) {
// Message is invalid or triggered an error during parsing
attributes.clear();
logger.error("Could not parse the flowfile {} as an email, treating as failure", originalFlowFile, e);
invalidFlowFilesList.add(originalFlowFile);
}
});
if (attributes.size() > 0) {
if (!attributes.isEmpty()) {
FlowFile updatedFlowFile = session.putAllAttributes(originalFlowFile, attributes);
logger.info("Extracted {} headers into {} file", new Object[]{attributes.size(), updatedFlowFile});
logger.info("Extracted {} headers into {} file", attributes.size(), updatedFlowFile);
processedFlowFilesList.add(updatedFlowFile);
}
@ -232,7 +215,7 @@ public class ExtractEmailHeaders extends AbstractProcessor {
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
return relationships;
}
@Override
@ -245,9 +228,28 @@ public class ExtractEmailHeaders extends AbstractProcessor {
final String attributePrefix,
Address[] addresses) {
if (addresses != null) {
for (int count = 0; count < ArrayUtils.getLength(addresses); count++) {
for (int count = 0; count < addresses.length; count++) {
attributes.put(attributePrefix + "." + count, addresses[count].toString());
}
}
}
private void countAttachments(final AtomicInteger counter, final MimePart parentPart, final int depth) throws MessagingException, IOException {
final String disposition = parentPart.getDisposition();
final Object parentContent = parentPart.getContent();
if (parentContent instanceof Multipart multipart) {
final int count = multipart.getCount();
final int partDepth = depth + 1;
for (int i = 0; i < count; i++) {
final BodyPart bodyPart = multipart.getBodyPart(i);
if (bodyPart instanceof MimeBodyPart mimeBodyPart) {
countAttachments(counter, mimeBodyPart, partDepth);
}
}
} else if (ATTACHMENT_DISPOSITION.equalsIgnoreCase(disposition) || depth > 0) {
counter.getAndIncrement();
}
}
}

View File

@ -17,18 +17,12 @@
package org.apache.nifi.processors.email;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -37,7 +31,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -46,12 +39,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.poi.hmef.Attachment;
import org.apache.poi.hmef.HMEFMessage;
@SupportsBatching
@SideEffectFree
@Tags({"split", "email"})
@ -61,7 +51,6 @@ import org.apache.poi.hmef.HMEFMessage;
@WritesAttribute(attribute = "filename ", description = "The filename of the attachment"),
@WritesAttribute(attribute = "email.tnef.attachment.parent.filename ", description = "The filename of the parent FlowFile"),
@WritesAttribute(attribute = "email.tnef.attachment.parent.uuid", description = "The UUID of the original FlowFile.")})
public class ExtractTNEFAttachments extends AbstractProcessor {
public static final String ATTACHMENT_ORIGINAL_FILENAME = "email.tnef.attachment.parent.filename";
public static final String ATTACHMENT_ORIGINAL_UUID = "email.tnef.attachment.parent.uuid";
@ -79,20 +68,7 @@ public class ExtractTNEFAttachments extends AbstractProcessor {
.description("Each individual flowfile that could not be parsed will be routed to the failure relationship")
.build();
private final static Set<Relationship> RELATIONSHIPS;
private final static List<PropertyDescriptor> DESCRIPTORS;
static {
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_ATTACHMENTS);
_relationships.add(REL_ORIGINAL);
_relationships.add(REL_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
final List<PropertyDescriptor> _descriptors = new ArrayList<>();
DESCRIPTORS = Collections.unmodifiableList(_descriptors);
}
private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_ATTACHMENTS, REL_ORIGINAL, REL_FAILURE);
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
@ -105,66 +81,50 @@ public class ExtractTNEFAttachments extends AbstractProcessor {
final List<FlowFile> invalidFlowFilesList = new ArrayList<>();
final List<FlowFile> originalFlowFilesList = new ArrayList<>();
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
Properties props = new Properties();
session.read(originalFlowFile, rawIn -> {
try (final InputStream in = new BufferedInputStream(rawIn)) {
// This will trigger an exception in case content is not a TNEF.
final HMEFMessage hmefMessage = new HMEFMessage(in);
HMEFMessage hmefMessage = null;
// Add original FlowFile (may revert later on in case of errors) //
originalFlowFilesList.add(originalFlowFile);
// This will trigger an exception in case content is not a TNEF.
hmefMessage = new HMEFMessage(in);
// Add otiginal flowfile (may revert later on in case of errors) //
originalFlowFilesList.add(originalFlowFile);
if (hmefMessage != null) {
// Attachments isn empty, proceeding.
if (!hmefMessage.getAttachments().isEmpty()) {
final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
try {
for (final Attachment attachment : hmefMessage.getAttachments()) {
FlowFile split = session.create(originalFlowFile);
final Map<String, String> attributes = new HashMap<>();
if (StringUtils.isNotBlank(attachment.getLongFilename())) {
attributes.put(CoreAttributes.FILENAME.key(), attachment.getFilename());
}
String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
// TODO: Extract Mime Type (HMEF doesn't seem to be able to get this info.
split = session.append(split, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(attachment.getContents());
}
});
split = session.putAllAttributes(split, attributes);
attachmentsList.add(split);
}
} catch (FlowFileHandlingException e) {
// Something went wrong
// Removing splits that may have been created
session.remove(attachmentsList);
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", new Object[]{originalFlowFile, e});
invalidFlowFilesList.add(originalFlowFile);
if (!hmefMessage.getAttachments().isEmpty()) {
final String originalFlowFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key());
try {
for (final Attachment attachment : hmefMessage.getAttachments()) {
FlowFile split = session.create(originalFlowFile);
final Map<String, String> attributes = new HashMap<>();
final String attachmentFilename = attachment.getFilename();
if (attachmentFilename != null && !attachmentFilename.isBlank()) {
attributes.put(CoreAttributes.FILENAME.key(), attachmentFilename);
}
String parentUuid = originalFlowFile.getAttribute(CoreAttributes.UUID.key());
attributes.put(ATTACHMENT_ORIGINAL_UUID, parentUuid);
attributes.put(ATTACHMENT_ORIGINAL_FILENAME, originalFlowFileName);
split = session.append(split, out -> out.write(attachment.getContents()));
split = session.putAllAttributes(split, attributes);
attachmentsList.add(split);
}
} catch (FlowFileHandlingException e) {
// Something went wrong
// Removing splits that may have been created
session.remove(attachmentsList);
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Flowfile {} triggered error {} while processing message removing generated FlowFiles from sessions", originalFlowFile, e);
invalidFlowFilesList.add(originalFlowFile);
}
} catch (Exception e) {
// Another error hit...
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Could not parse the flowfile {} as an email, treating as failure", new Object[]{originalFlowFile, e});
// Message is invalid or triggered an error during parsing
invalidFlowFilesList.add(originalFlowFile);
}
} catch (Exception e) {
// Another error hit...
// Removing the original flow from its list
originalFlowFilesList.remove(originalFlowFile);
logger.error("Could not parse {} as an email, treating as failure", originalFlowFile, e);
// Message is invalid or triggered an error during parsing
invalidFlowFilesList.add(originalFlowFile);
}
});
@ -176,13 +136,13 @@ public class ExtractTNEFAttachments extends AbstractProcessor {
session.transfer(originalFlowFilesList, REL_ORIGINAL);
// check if attachments have been extracted
if (attachmentsList.size() != 0) {
if (!attachmentsList.isEmpty()) {
if (attachmentsList.size() > 10) {
// If more than 10, summarise log
logger.info("Split {} into {} files", new Object[]{originalFlowFile, attachmentsList.size()});
logger.info("Split {} into {} files", originalFlowFile, attachmentsList.size());
} else {
// Otherwise be more verbose and list each individual split
logger.info("Split {} into {} files: {}", new Object[]{originalFlowFile, attachmentsList.size(), attachmentsList});
logger.info("Split {} into {} files: {}", originalFlowFile, attachmentsList.size(), attachmentsList);
}
}
}
@ -191,12 +151,5 @@ public class ExtractTNEFAttachments extends AbstractProcessor {
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
}

View File

@ -24,8 +24,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@ -37,24 +35,13 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.email.smtp.SmtpConsumer;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.springframework.util.StringUtils;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandlerFactory;
import org.subethamail.smtp.server.SMTPServer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -134,8 +121,9 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
.name("CLIENT_AUTH")
.displayName("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.required(true)
.allowableValues(ClientAuth.NONE.name(), ClientAuth.REQUIRED.name())
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
protected static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
@ -152,25 +140,17 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
.description("All new messages will be routed as FlowFiles to this relationship")
.build();
private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
private final static List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
SMTP_PORT,
SMTP_MAXIMUM_CONNECTIONS,
SMTP_TIMEOUT,
SMTP_MAXIMUM_MSG_SIZE,
SSL_CONTEXT_SERVICE,
CLIENT_AUTH,
SMTP_HOSTNAME
);
private final static Set<Relationship> RELATIONSHIPS;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(SMTP_PORT);
_propertyDescriptors.add(SMTP_MAXIMUM_CONNECTIONS);
_propertyDescriptors.add(SMTP_TIMEOUT);
_propertyDescriptors.add(SMTP_MAXIMUM_MSG_SIZE);
_propertyDescriptors.add(SSL_CONTEXT_SERVICE);
_propertyDescriptors.add(CLIENT_AUTH);
_propertyDescriptors.add(SMTP_HOSTNAME);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
}
private final static Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
private volatile SMTPServer smtp;
@ -180,28 +160,27 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
try {
final SMTPServer server = prepareServer(context, sessionFactory);
server.start();
getLogger().debug("Started SMTP Server on port " + server.getPort());
getLogger().info("Started SMTP Server on {}", server.getPortAllocated());
smtp = server;
} catch (final Exception ex) {//have to catch exception due to awkward exception handling in subethasmtp
} catch (final Exception e) {//have to catch exception due to awkward exception handling in subethasmtp
smtp = null;
getLogger().error("Unable to start SMTP server due to " + ex.getMessage(), ex);
getLogger().error("Unable to start SMTP server", e);
}
}
context.yield();//nothing really to do here since threading managed by smtp server sessions
}
public int getListeningPort() {
return smtp == null ? 0 : smtp.getPort();
return smtp == null ? 0 : smtp.getPortAllocated();
}
@OnStopped
public void stop() {
try {
smtp.stop();
getLogger().debug("Stopped SMTP server on port " + smtp.getPort());
}catch (Exception ex){
getLogger().error("Error stopping SMTP server: " + ex.getMessage());
}finally {
} catch (final Exception e) {
getLogger().error("Failed to stop SMTP Server", e);
} finally {
smtp = null;
}
}
@ -211,78 +190,43 @@ public class ListenSMTP extends AbstractSessionFactoryProcessor {
return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>();
String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && !StringUtils.hasText(clientAuth)) {
results.add(new ValidationResult.Builder()
.subject(CLIENT_AUTH.getDisplayName())
.explanation(CLIENT_AUTH.getDisplayName() + " must be provided when using " + SSL_CONTEXT_SERVICE.getDisplayName())
.valid(false)
.build());
} else if (sslContextService == null && StringUtils.hasText(clientAuth)) {
results.add(new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName())
.explanation(SSL_CONTEXT_SERVICE.getDisplayName() + " must be provided when selecting " + CLIENT_AUTH.getDisplayName())
.valid(false)
.build());
}
return results;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
private SMTPServer prepareServer(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final SMTPServer.Builder smtpServerBuilder = new SMTPServer.Builder();
final int port = context.getProperty(SMTP_PORT).asInteger();
final String host = context.getProperty(SMTP_HOSTNAME).getValue();
final ComponentLog log = getLogger();
final int maxMessageSize = context.getProperty(SMTP_MAXIMUM_MSG_SIZE).asDataSize(DataUnit.B).intValue();
//create message handler factory
final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> {
return new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize);
};
//create smtp server
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SMTPServer smtpServer = sslContextService == null ? new SMTPServer(messageHandlerFactory) : new SMTPServer(messageHandlerFactory) {
@Override
public SSLSocket createSSLSocket(Socket socket) throws IOException {
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
final String clientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext = sslContextService.createContext();
final SSLSocketFactory socketFactory = sslContext.getSocketFactory();
final SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(socket, remoteAddress.getHostName(), socket.getPort(), true);
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
sslSocket.setEnabledProtocols(tlsConfiguration.getEnabledProtocols());
final MessageHandlerFactory messageHandlerFactory = (final MessageContext mc) -> new SmtpConsumer(mc, sessionFactory, port, host, log, maxMessageSize);
sslSocket.setUseClientMode(false);
if (ClientAuth.REQUIRED.getType().equals(clientAuth)) {
this.setRequireTLS(true);
sslSocket.setNeedClientAuth(true);
}
return sslSocket;
}
};
if (sslContextService != null) {
smtpServer.setEnableTLS(true);
} else {
smtpServer.setHideTLS(true);
}
smtpServer.setSoftwareName("Apache NiFi SMTP");
smtpServer.setPort(port);
smtpServer.setMaxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
smtpServer.setMaxMessageSize(maxMessageSize);
smtpServer.setConnectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
smtpServerBuilder.messageHandlerFactory(messageHandlerFactory);
smtpServerBuilder.port(port);
smtpServerBuilder.softwareName("Apache NiFi SMTP");
smtpServerBuilder.maxConnections(context.getProperty(SMTP_MAXIMUM_CONNECTIONS).asInteger());
smtpServerBuilder.maxMessageSize(maxMessageSize);
smtpServerBuilder.connectionTimeout(context.getProperty(SMTP_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
if (context.getProperty(SMTP_HOSTNAME).isSet()) {
smtpServer.setHostName(context.getProperty(SMTP_HOSTNAME).getValue());
smtpServerBuilder.hostName(context.getProperty(SMTP_HOSTNAME).getValue());
}
return smtpServer;
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService == null) {
smtpServerBuilder.hideTLS();
} else {
smtpServerBuilder.enableTLS();
final String clientAuth = context.getProperty(CLIENT_AUTH).getValue();
final boolean requireClientCertificate = ClientAuth.REQUIRED.getType().equalsIgnoreCase(clientAuth);
final SSLContext sslContext = sslContextService.createContext();
smtpServerBuilder.startTlsSocketFactory(sslContext, requireClientCertificate);
}
return smtpServerBuilder.build();
}
}

View File

@ -24,13 +24,12 @@ import java.net.SocketAddress;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -38,15 +37,16 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processors.email.ListenSMTP;
import org.apache.nifi.security.cert.StandardPrincipalFormatter;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.subethamail.smtp.MessageContext;
import org.subethamail.smtp.MessageHandler;
import org.subethamail.smtp.RejectException;
import org.subethamail.smtp.TooMuchDataException;
import org.subethamail.smtp.server.SMTPServer;
import javax.security.auth.x500.X500Principal;
/**
* A simple consumer that provides a bridge between 'push' message distribution
* provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
@ -82,16 +82,8 @@ public class SmtpConsumer implements MessageHandler {
this.maxMessageSize = maxMessageSize;
}
String getFrom() {
return from;
}
List<String> getRecipients() {
return Collections.unmodifiableList(recipientList);
}
@Override
public void data(final InputStream data) throws RejectException, TooMuchDataException, IOException {
public String data(final InputStream data) throws IOException {
final ProcessSession processSession = sessionFactory.createSession();
final StopWatch watch = new StopWatch();
watch.start();
@ -100,7 +92,7 @@ public class SmtpConsumer implements MessageHandler {
final AtomicBoolean limitExceeded = new AtomicBoolean(false);
flowFile = processSession.write(flowFile, (OutputStream out) -> {
final LimitingInputStream lis = new LimitingInputStream(data, maxMessageSize);
IOUtils.copy(lis, out);
StreamUtils.copy(lis, out);
if (lis.hasReachedLimit()) {
limitExceeded.set(true);
}
@ -113,19 +105,21 @@ public class SmtpConsumer implements MessageHandler {
processSession.getProvenanceReporter().receive(flowFile, "smtp://" + host + ":" + port + "/", watch.getDuration(TimeUnit.MILLISECONDS));
processSession.transfer(flowFile, ListenSMTP.REL_SUCCESS);
processSession.commitAsync();
} catch (FlowFileAccessException | IllegalStateException | RejectException | IOException ex) {
log.error("Unable to fully process input due to " + ex.getMessage(), ex);
throw ex;
} catch (final FlowFileAccessException | IllegalStateException | IOException e) {
log.error("SMTP data processing failed", e);
throw e;
}
return null;
}
@Override
public void from(final String from) throws RejectException {
public void from(final String from) {
this.from = from;
}
@Override
public void recipient(final String recipient) throws RejectException {
public void recipient(final String recipient) {
if (recipient != null && recipient.length() < 100 && recipientList.size() < 100) {
recipientList.add(recipient);
}
@ -142,7 +136,7 @@ public class SmtpConsumer implements MessageHandler {
for (int i = 0; i < tlsPeerCertificates.length; i++) {
if (tlsPeerCertificates[i] instanceof final X509Certificate x509Cert) {
attributes.put("smtp.certificate." + i + ".serial", x509Cert.getSerialNumber().toString());
attributes.put("smtp.certificate." + i + ".subjectName", StandardPrincipalFormatter.getInstance().getSubject(x509Cert));
attributes.put("smtp.certificate." + i + ".subjectName", x509Cert.getSubjectX500Principal().getName(X500Principal.RFC1779));
}
}
}
@ -156,7 +150,9 @@ public class SmtpConsumer implements MessageHandler {
attributes.put("smtp.src", strAddress);
}
attributes.put("smtp.helo", context.getHelo());
final Optional<String> helo = context.getHelo();
helo.ifPresent(s -> attributes.put("smtp.helo", s));
attributes.put("smtp.from", from);
for (int i = 0; i < recipientList.size(); i++) {
attributes.put("smtp.recipient." + i, recipientList.get(i));
@ -164,5 +160,4 @@ public class SmtpConsumer implements MessageHandler {
attributes.put(CoreAttributes.MIME_TYPE.key(), "message/rfc822");
return attributes;
}
}

View File

@ -17,15 +17,7 @@
package org.apache.nifi.processors.email;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailAttachment;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.MultiPartEmail;
import org.apache.commons.mail.SimpleEmail;
import java.nio.charset.StandardCharsets;
public class GenerateAttachment {
String from;
@ -34,6 +26,10 @@ public class GenerateAttachment {
String message;
String hostName;
private static final String NEWLINE = "\n";
private static final String BOUNDARY = "5A7C0449-336B-4F73-81EF-F176E4DF44B2";
public GenerateAttachment(String from, String to, String subject, String message, String hostName) {
this.from = from;
this.to = to;
@ -42,73 +38,72 @@ public class GenerateAttachment {
this.hostName = hostName;
}
public byte[] SimpleEmail() {
MimeMessage mimeMessage = SimpleEmailMimeMessage();
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
}
return output.toByteArray();
public byte[] simpleMessage() {
return simpleMessage(null);
}
public MimeMessage SimpleEmailMimeMessage() {
Email email = new SimpleEmail();
try {
email.setFrom(from);
email.addTo(to);
email.setSubject(subject);
email.setMsg(message);
email.setHostName(hostName);
email.buildMimeMessage();
} catch (EmailException e) {
e.printStackTrace();
public byte[] simpleMessage(final String recipient) {
final StringBuilder builder = new StringBuilder();
builder.append("MIME-Version: 1.0");
builder.append(NEWLINE);
builder.append("Content-Type: text/plain; charset=utf-8");
builder.append(NEWLINE);
builder.append("From: ");
builder.append(from);
builder.append(NEWLINE);
if (recipient != null) {
builder.append("To: ");
builder.append(recipient);
builder.append(NEWLINE);
}
return email.getMimeMessage();
builder.append("Subject: ");
builder.append(subject);
builder.append(NEWLINE);
builder.append(NEWLINE);
builder.append(message);
return builder.toString().getBytes(StandardCharsets.UTF_8);
}
public byte[] withAttachments(int amount) {
final StringBuilder builder = new StringBuilder();
public byte[] WithAttachments(int amount) {
MultiPartEmail email = new MultiPartEmail();
try {
builder.append("MIME-Version: 1.0");
builder.append(NEWLINE);
email.setFrom(from);
email.addTo(to);
email.setSubject(subject);
email.setMsg(message);
email.setHostName(hostName);
builder.append("Content-Type: multipart/mixed; boundary=\"");
builder.append(BOUNDARY);
builder.append("\"");
builder.append(NEWLINE);
int x = 1;
while (x <= amount) {
// Create an attachment with the pom.xml being used to compile (yay!!!)
EmailAttachment attachment = new EmailAttachment();
attachment.setPath("pom.xml");
attachment.setDisposition(EmailAttachment.ATTACHMENT);
attachment.setDescription("pom.xml");
attachment.setName("pom.xml"+String.valueOf(x));
// attach
email.attach(attachment);
x++;
}
email.buildMimeMessage();
} catch (EmailException e) {
e.printStackTrace();
}
ByteArrayOutputStream output = new ByteArrayOutputStream();
MimeMessage mimeMessage = email.getMimeMessage();
try {
mimeMessage.writeTo(output);
} catch (IOException e) {
e.printStackTrace();
} catch (MessagingException e) {
e.printStackTrace();
builder.append("From: ");
builder.append(from);
builder.append(NEWLINE);
builder.append("To: ");
builder.append(to);
builder.append(NEWLINE);
builder.append("Subject: ");
builder.append(subject);
builder.append(NEWLINE);
builder.append(NEWLINE);
for (int i = 0; i < amount; i++) {
builder.append("--");
builder.append(BOUNDARY);
builder.append(NEWLINE);
builder.append("Content-Type: text/plain; charset=utf-8");
builder.append(NEWLINE);
builder.append("Content-Disposition: attachment; filename=\"pom.xml-%d\"".formatted(i));
builder.append(NEWLINE);
builder.append(NEWLINE);
builder.append("Attachment");
builder.append(i);
builder.append(NEWLINE);
}
return output.toByteArray();
return builder.toString().getBytes(StandardCharsets.UTF_8);
}
}

View File

@ -28,27 +28,24 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.integration.mail.AbstractMailReceiver;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.lang.reflect.Field;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import jakarta.mail.Session;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import java.util.List;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITestConsumeEmail {
public class TestConsumeEmail {
private GreenMail mockIMAP4Server;
private GreenMail mockPOP3Server;
private GreenMailUser imapUser;
private GreenMailUser popUser;
// Setup mock imap server
@BeforeEach
public void setUp() {
mockIMAP4Server = new GreenMail(ServerSetupTest.IMAP);
@ -77,7 +74,6 @@ public class ITestConsumeEmail {
user.deliver(message);
}
// Start the testing units
@Test
public void testConsumeIMAP4() throws Exception {
@ -157,24 +153,4 @@ public class ITestConsumeEmail {
assertEquals("pop3", consume.getProtocol(runner.getProcessContext()));
}
@Test
public void validateUrl() throws Exception {
Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl");
displayUrlField.setAccessible(true);
AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP();
TestRunner runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
assertEquals("imap://jon:qhgwjgehr@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext()));
assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume));
}
}

View File

@ -25,13 +25,10 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestExtractEmailAttachments {
// Setups the fields to be used...
String from = "Alice <alice@nifi.apache.org>";
String to = "bob@nifi.apache.org";
String subject = "Just a test email";
@ -40,13 +37,11 @@ public class TestExtractEmailAttachments {
GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName);
@Test
public void testValidEmailWithAttachments() throws Exception {
public void testValidEmailWithAttachments() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
byte [] withAttachment = attachmentGenerator.WithAttachments(1);
byte [] withAttachment = attachmentGenerator.withAttachments(1);
runner.enqueue(withAttachment);
runner.run();
@ -56,17 +51,15 @@ public class TestExtractEmailAttachments {
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 1);
// Have a look at the attachments...
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
splits.get(0).assertAttributeEquals("filename", "pom.xml1");
splits.get(0).assertAttributeEquals("filename", "pom.xml-0");
}
@Test
public void testValidEmailWithMultipleAttachments() throws Exception {
Random rnd = new Random() ;
public void testValidEmailWithMultipleAttachments() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
int amount = rnd.nextInt(10) + 1;
byte [] withAttachment = attachmentGenerator.WithAttachments(amount);
int amount = 3;
byte [] withAttachment = attachmentGenerator.withAttachments(amount);
runner.enqueue(withAttachment);
runner.run();
@ -74,23 +67,22 @@ public class TestExtractEmailAttachments {
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, amount);
// Have a look at the attachments...
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailAttachments.REL_ATTACHMENTS);
List<String> filenames = new ArrayList<>();
for (int a = 0 ; a < amount ; a++ ) {
filenames.add(splits.get(a).getAttribute("filename").toString());
filenames.add(splits.get(a).getAttribute("filename"));
}
assertTrue(filenames.containsAll(Arrays.asList("pom.xml1", "pom.xml" + amount)));
assertTrue(filenames.containsAll(Arrays.asList("pom.xml-0", "pom.xml-1", "pom.xml-2")));
}
@Test
public void testValidEmailWithoutAttachments() throws Exception {
public void testValidEmailWithoutAttachments() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
// Create the message dynamically
byte [] simpleEmail = attachmentGenerator.SimpleEmail();
byte [] simpleEmail = attachmentGenerator.simpleMessage();
runner.enqueue(simpleEmail);
runner.run();
@ -98,11 +90,10 @@ public class TestExtractEmailAttachments {
runner.assertTransferCount(ExtractEmailAttachments.REL_ORIGINAL, 1);
runner.assertTransferCount(ExtractEmailAttachments.REL_FAILURE, 0);
runner.assertTransferCount(ExtractEmailAttachments.REL_ATTACHMENTS, 0);
}
@Test
public void testInvalidEmail() throws Exception {
public void testInvalidEmail() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailAttachments());
runner.enqueue("test test test chocolate".getBytes());
runner.run();

View File

@ -22,15 +22,9 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
public class TestExtractEmailHeaders {
// Setup the fields to be used...
String from = "Alice <alice@nifi.apache.org>";
String to = "bob@nifi.apache.org";
String subject = "Just a test email";
@ -40,11 +34,10 @@ public class TestExtractEmailHeaders {
GenerateAttachment attachmentGenerator = new GenerateAttachment(from, to, subject, message, hostName);
@Test
public void testValidEmailWithAttachments() throws Exception {
public void testValidEmailWithAttachments() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
// Create the message dynamically
byte [] withAttachment = attachmentGenerator.WithAttachments(1);
byte [] withAttachment = attachmentGenerator.withAttachments(1);
runner.enqueue(withAttachment);
runner.run();
@ -61,12 +54,11 @@ public class TestExtractEmailHeaders {
}
@Test
public void testValidEmailWithoutAttachments() throws Exception {
public void testValidEmailWithoutAttachments() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version");
// Create the message dynamically
byte [] simpleEmail = attachmentGenerator.SimpleEmail();
byte [] simpleEmail = attachmentGenerator.simpleMessage(to);
runner.enqueue(simpleEmail);
runner.run();
@ -89,24 +81,12 @@ public class TestExtractEmailHeaders {
* TO, CC, BCC.
*/
@Test
public void testValidEmailWithNoRecipients() throws Exception {
public void testValidEmailWithNoRecipients() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.CAPTURED_HEADERS, "MIME-Version");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
simpleEmailMimeMessage.removeHeader("To");
simpleEmailMimeMessage.removeHeader("Cc");
simpleEmailMimeMessage.removeHeader("Bcc");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
final byte[] message = attachmentGenerator.simpleMessage();
runner.enqueue(message);
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
@ -128,32 +108,21 @@ public class TestExtractEmailHeaders {
* addresses.
*/
@Test
public void testNonStrictParsingPassesForInvalidAddresses() throws Exception {
public void testNonStrictParsingPassesForInvalidAddresses() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "false");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
final byte[] message = attachmentGenerator.simpleMessage("<>, Joe, \"\" <>");
simpleEmailMimeMessage.setHeader("From", "<bad_email>");
simpleEmailMimeMessage.setHeader("To", "<>, Joe, \"\" <>");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
runner.enqueue(message);
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 1);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 0);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(ExtractEmailHeaders.REL_SUCCESS);
splits.get(0).assertAttributeEquals("email.headers.from.0", "bad_email");
splits.get(0).assertAttributeEquals("email.headers.to.0", "");
splits.get(0).assertAttributeEquals("email.headers.to.1", "Joe");
splits.get(0).assertAttributeEquals("email.headers.to.2", "");
@ -166,23 +135,13 @@ public class TestExtractEmailHeaders {
* addresses.
*/
@Test
public void testStrictParsingFailsForInvalidAddresses() throws Exception {
public void testStrictParsingFailsForInvalidAddresses() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.setProperty(ExtractEmailHeaders.STRICT_PARSING, "true");
MimeMessage simpleEmailMimeMessage = attachmentGenerator.SimpleEmailMimeMessage();
final byte[] message = attachmentGenerator.simpleMessage("<>, Joe, \"\" <>");
simpleEmailMimeMessage.setHeader("From", "<bad_email>");
simpleEmailMimeMessage.setHeader("To", "<>, Joe, <invalid>");
ByteArrayOutputStream messageBytes = new ByteArrayOutputStream();
try {
simpleEmailMimeMessage.writeTo(messageBytes);
} catch (IOException | MessagingException e) {
e.printStackTrace();
}
runner.enqueue(messageBytes.toByteArray());
runner.enqueue(message);
runner.run();
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0);
@ -190,7 +149,7 @@ public class TestExtractEmailHeaders {
}
@Test
public void testInvalidEmail() throws Exception {
public void testInvalidEmail() {
final TestRunner runner = TestRunners.newTestRunner(new ExtractEmailHeaders());
runner.enqueue("test test test chocolate".getBytes());
runner.run();
@ -198,5 +157,4 @@ public class TestExtractEmailHeaders {
runner.assertTransferCount(ExtractEmailHeaders.REL_SUCCESS, 0);
runner.assertTransferCount(ExtractEmailHeaders.REL_FAILURE, 1);
}
}

View File

@ -30,12 +30,12 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import jakarta.mail.Session;
import jakarta.mail.Transport;
import jakarta.mail.internet.InternetAddress;
import jakarta.mail.internet.MimeMessage;
import javax.net.ssl.SSLContext;
import java.net.Socket;
import java.security.GeneralSecurityException;

View File

@ -1192,7 +1192,6 @@
!AzureGraphUserGroupProviderIT,
!GremlinClientServiceYamlSettingsAndBytecodeIT,
!GremlinClientServiceControllerSettingsIT,
!ITestConsumeEmail#validateUrl,
!PrometheusReportingTaskIT#testNullLabel,
!SnowflakeConnectionPoolIT,
!SnowflakePipeIT,