NIFI-1148 added IMAP/POP3 support added initial set of processors to support consumption of Email via IMAP/IMAPS and POP3 protocols

Signed-off-by: Matt Burgess <mattyb149@apache.org>

NIFI-1148 addressed PR comments from @trixpan

NIFI-1148 addressing PR comments

NIFI-1148 addressed PR comments

This closes #710
This commit is contained in:
Oleg Zhurakousky 2016-07-23 10:42:40 -04:00 committed by Matt Burgess
parent 120d2100a3
commit 9b647cd538
8 changed files with 896 additions and 67 deletions

View File

@ -1,73 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-email-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<artifactId>nifi-email-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.subethamail</groupId>
<artifactId>subethasmtp</artifactId>
<version>3.1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>4.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,407 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.mail.Address;
import javax.mail.Flags;
import javax.mail.Message;
import javax.mail.MessagingException;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.StaticListableBeanFactory;
import org.springframework.integration.mail.AbstractMailReceiver;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
/**
* Base processor for implementing processors to consume messages from Email
* servers using Spring Integration libraries.
*
* @param <T>
* the type of {@link AbstractMailReceiver}.
*/
abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor {
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
.name("host")
.displayName("Host Name")
.description("Network address of Email server (e.g., pop.gmail.com, imap.gmail.com . . .)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("port")
.displayName("Port")
.description("Numeric value identifying Port of Email server (e.g., 993)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
.name("user")
.displayName("User Name")
.description("User Name used for authentication and authorization with Email server.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("password")
.displayName("Password")
.description("Password used for authentication and authorization with Email server.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
.name("folder")
.displayName("Folder")
.description("Email folder to retrieve messages from (e.g., INBOX)")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("INBOX")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("fetch.size")
.displayName("Fetch Size")
.description("Specify the maximum number of Messages to fetch per call to Email Server.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("10")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor SHOULD_DELETE_MESSAGES = new PropertyDescriptor.Builder()
.name("delete.messages")
.displayName("Delete Messages")
.description("Specify whether mail messages should be deleted after retrieval.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship")
.build();
static List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>();
static Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>();
/*
* Will ensure that list of PropertyDescriptors is build only once, since
* all other lifecycle methods are invoked multiple times.
*/
static {
SHARED_DESCRIPTORS.add(HOST);
SHARED_DESCRIPTORS.add(PORT);
SHARED_DESCRIPTORS.add(USER);
SHARED_DESCRIPTORS.add(PASSWORD);
SHARED_DESCRIPTORS.add(FOLDER);
SHARED_DESCRIPTORS.add(FETCH_SIZE);
SHARED_DESCRIPTORS.add(SHOULD_DELETE_MESSAGES);
SHARED_RELATIONSHIPS.add(REL_SUCCESS);
}
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected volatile T messageReceiver;
private volatile BlockingQueue<Message> messageQueue;
private volatile String displayUrl;
private volatile ProcessSession processSession;
private volatile boolean shouldSetDeleteFlag;
@OnStopped
public void stop(ProcessContext processContext) {
this.flushRemainingMessages(processContext);
try {
this.messageReceiver.destroy();
this.messageReceiver = null;
} catch (Exception e) {
this.logger.warn("Failure while closing processor", e);
}
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return SHARED_RELATIONSHIPS;
}
/**
*
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
this.initializeIfNecessary(context, processSession);
Message emailMessage = this.receiveMessage();
if (emailMessage != null) {
this.transfer(emailMessage, context, processSession);
}
}
/**
*
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Java Mail property.")
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
.build();
}
/**
* Delegates to sub-classes to build the target receiver as
* {@link AbstractMailReceiver}
*
* @param context
* instance of {@link ProcessContext}
* @return new instance of {@link AbstractMailReceiver}
*/
protected abstract T buildMessageReceiver(ProcessContext context);
/**
* Return the target receivere's mail protocol (e.g., imap, pop etc.)
*/
protected abstract String getProtocol(ProcessContext processContext);
/**
* Builds the url used to connect to the email server.
*/
String buildUrl(ProcessContext processContext) {
String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
String password = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
StringBuilder urlBuilder = new StringBuilder();
try {
urlBuilder.append(URLEncoder.encode(user, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new ProcessException(e);
}
urlBuilder.append(":");
try {
urlBuilder.append(URLEncoder.encode(password, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new ProcessException(e);
}
urlBuilder.append("@");
urlBuilder.append(host);
urlBuilder.append(":");
urlBuilder.append(port);
urlBuilder.append("/");
urlBuilder.append(folder);
String protocol = this.getProtocol(processContext);
String finalUrl = protocol + "://" + urlBuilder.toString();
// build display-safe URL
int passwordStartIndex = urlBuilder.indexOf(":") + 1;
int passwordEndIndex = urlBuilder.indexOf("@");
urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
this.displayUrl = protocol + "://" + urlBuilder.toString();
if (this.logger.isInfoEnabled()) {
this.logger.info("Connecting to Email server at the following URL: " + this.displayUrl);
}
return finalUrl;
}
/**
* Builds and initializes the target message receiver if necessary (if it's
* null). Upon execution of this operation the receiver is fully functional
* and is ready to receive messages.
*/
private synchronized void initializeIfNecessary(ProcessContext context, ProcessSession processSession) {
if (this.messageReceiver == null) {
this.processSession = processSession;
this.messageReceiver = this.buildMessageReceiver(context);
this.shouldSetDeleteFlag = context.getProperty(SHOULD_DELETE_MESSAGES).asBoolean();
int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
this.messageReceiver.setMaxFetchSize(fetchSize);
this.messageReceiver.setJavaMailProperties(this.buildJavaMailProperties(context));
// need to avoid spring warning messages
this.messageReceiver.setBeanFactory(new StaticListableBeanFactory());
this.messageReceiver.afterPropertiesSet();
this.messageQueue = new ArrayBlockingQueue<>(fetchSize);
}
}
/**
* Extracts dynamic properties which typically represent the Java Mail
* properties from the {@link ProcessContext} returnining them as instance
* of {@link Properties}
*/
private Properties buildJavaMailProperties(ProcessContext context) {
Properties javaMailProperties = new Properties();
for (Entry<PropertyDescriptor, String> propertyDescriptorEntry : context.getProperties().entrySet()) {
if (propertyDescriptorEntry.getKey().isDynamic()) {
javaMailProperties.setProperty(propertyDescriptorEntry.getKey().getName(),
propertyDescriptorEntry.getValue());
}
}
return javaMailProperties;
}
/**
* Fills the internal message queue if such queue is empty. This is due to
* the fact that per single session there may be multiple messages retrieved
* from the email server (see FETCH_SIZE).
*/
private synchronized void fillMessageQueueIfNecessary() {
if (this.messageQueue.isEmpty()) {
Object[] messages;
try {
messages = this.messageReceiver.receive();
} catch (MessagingException e) {
String errorMsg = "Failed to receive messages from Email server: [" + e.getClass().getName()
+ " - " + e.getMessage();
this.getLogger().error(errorMsg);
throw new ProcessException(errorMsg, e);
}
if (messages != null) {
for (Object message : messages) {
Assert.isTrue(message instanceof Message, "Message is not an instance of javax.mail.Message");
this.messageQueue.offer((Message) message);
}
}
}
}
/**
* Disposes the message by converting it to a {@link FlowFile} transferring
* it to the REL_SUCCESS relationship.
*/
private void transfer(Message emailMessage, ProcessContext context, ProcessSession processSession) {
long start = System.nanoTime();
FlowFile flowFile = processSession.create();
flowFile = processSession.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
StreamUtils.copy(emailMessage.getInputStream(), out);
} catch (MessagingException e) {
throw new IOException(e);
}
}
});
long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
String fromAddressesString = "";
try {
Address[] fromAddresses = emailMessage.getFrom();
if (fromAddresses != null) {
fromAddressesString = Arrays.asList(fromAddresses).toString();
}
} catch (MessagingException e) {
this.logger.warn("Faild to retrieve 'From' attribute from Message.");
}
processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration);
this.getLogger().info("Successfully received {} from {} in {} millis", new Object[] { flowFile, fromAddressesString, executionDuration });
processSession.transfer(flowFile, REL_SUCCESS);
try {
emailMessage.setFlag(Flags.Flag.DELETED, this.shouldSetDeleteFlag);
} catch (MessagingException e) {
this.logger.warn("Failed to set DELETE Flag on the message", e);
this.getLogger().warn("Failed to set DELETE Flag on the message");
}
}
/**
* Receives message from the internal queue filling up the queue if
* necessary.
*/
private Message receiveMessage() {
Message emailMessage = null;
try {
this.fillMessageQueueIfNecessary();
emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
this.logger.debug("Current thread is interrupted");
}
return emailMessage;
}
/**
* Will flush the remaining messages when this processor is stopped. The
* flushed messages are disposed via
* {@link #disposeMessage(Message, ProcessContext, ProcessSession)}
* operation
*/
private void flushRemainingMessages(ProcessContext processContext) {
Message emailMessage;
try {
while ((emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
this.transfer(emailMessage, processContext, this.processSession);
this.processSession.commit();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
this.logger.debug("Current thread is interrupted");
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.integration.mail.ImapMailReceiver;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes messages from Email Server using IMAP protocol. "
+ "The raw-bytes of each received email message are written as contents of the FlowFile")
@Tags({ "Email", "Imap", "Get", "Ingest", "Ingress", "Message", "Consume" })
public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> {
public static final PropertyDescriptor SHOULD_MARK_READ = new PropertyDescriptor.Builder()
.name("Mark Messages as Read")
.description("Specify if messages should be marked as read after retrieval.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor USE_SSL = new PropertyDescriptor.Builder()
.name("Use SSL")
.description("Specifies if IMAP connection must be obtained via SSL encrypted connection (i.e., IMAPS)")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final List<PropertyDescriptor> DESCRIPTORS;
static {
List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.addAll(SHARED_DESCRIPTORS);
_descriptors.add(SHOULD_MARK_READ);
_descriptors.add(USE_SSL);
DESCRIPTORS = Collections.unmodifiableList(_descriptors);
}
/**
*
*/
@Override
protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) {
ImapMailReceiver receiver = new ImapMailReceiver(this.buildUrl(processContext));
boolean shouldMarkAsRead = processContext.getProperty(SHOULD_MARK_READ).asBoolean();
receiver.setShouldMarkMessagesAsRead(shouldMarkAsRead);
return receiver;
}
/**
*
*/
@Override
protected String getProtocol(ProcessContext processContext) {
return processContext.getProperty(USE_SSL).asBoolean() ? "imaps" : "imap";
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.springframework.integration.mail.Pop3MailReceiver;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes messages from Email Server using POP3 protocol. "
+ "The raw-bytes of each received email message are written as contents of the FlowFile")
@Tags({ "Email", "POP3", "Get", "Ingest", "Ingress", "Message", "Consume" })
public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> {
static final List<PropertyDescriptor> DESCRIPTORS;
static {
List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.addAll(SHARED_DESCRIPTORS);
DESCRIPTORS = Collections.unmodifiableList(_descriptors);
}
/**
*
*/
@Override
protected String getProtocol(ProcessContext processContext) {
return "pop3";
}
/**
*
*/
@Override
protected Pop3MailReceiver buildMessageReceiver(ProcessContext context) {
return new Pop3MailReceiver(this.buildUrl(context));
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
}

View File

@ -15,3 +15,5 @@
org.apache.nifi.processors.email.ExtractEmailAttachments
org.apache.nifi.processors.email.ExtractEmailHeaders
org.apache.nifi.processors.email.ListenSMTP
org.apache.nifi.processors.email.ConsumeIMAP
org.apache.nifi.processors.email.ConsumePOP3

View File

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ConsumeIMAP</title>
<link rel="stylesheet" href="../../css/component-usage.css"
type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>This Processor consumes email messages via IMAP protocol and sends the content of an email message as content of the Flow File.
Content of the incoming email message is written as raw bytes to the content of the outgoing Flow File.
</p>
<p>Different email providers may require additional Java Mail properties which could be provided as dynamic properties.
For example, below is a sample configuration for GMail:
</p>
<p>
<b>Processor's static properties:</b>
<ul>
<li><b>Host Name</b> - imap.gmail.com</li>
<li><b>Port</b> - 993</li>
<li><b>User Name</b> - <i>[your user name]</i></li>
<li><b>Password</b> - <i>[your password]</i></li>
<li><b>Folder</b> - INBOX</li>
</ul>
<b>Processor's dynamic properties:</b>
<ul>
<li><b>mail.imap.socketFactory.class</b> - javax.net.ssl.SSLSocketFactory</li>
<li><b>mail.imap.socketFactory.fallback</b> - false</li>
<li><b>mail.store.protocol</b> - imaps</li>
</ul>
</p>
<p>
Another useful property is <b>mail.debug</b> which allows Java Mail API to print protocol messages to the console helping you to both understand what's going on as well as debug issues.
</p>
<p>
For the full list of available Java Mail properties please refer to <a href="http://connector.sourceforge.net/doc-files/Properties.html">here</a>
</p>
</body>
</html>

View File

@ -0,0 +1,57 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ConsumePOP3</title>
<link rel="stylesheet" href="../../css/component-usage.css"
type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>This Processor consumes email messages via POP3 protocol and sends the content of an email message as content of the Flow File.
Content of the incoming email message is written as raw bytes to the content of the outgoing Flow File.
</p>
<p>Since different serves may require different Java Mail
properties such properties could be provided via dynamic properties.
For example, below is a sample configuration for GMail:
</p>
<p>
<b>Processor's static properties:</b>
<ul>
<li><b>Host Name</b> - pop.gmail.com</li>
<li><b>Port</b> - 995</li>
<li><b>User Name</b> - <i>[your user name]</i></li>
<li><b>Password</b> - <i>[your password]</i></li>
<li><b>Folder</b> - INBOX</li>
</ul>
<b>Processor's dynamic properties:</b>
<ul>
<li><b>mail.pop3.socketFactory.class</b> - javax.net.ssl.SSLSocketFactory</li>
<li><b>mail.pop3.socketFactory.fallback</b> - false</li>
</ul>
</p>
<p>
Another useful property is <b>mail.debug</b> which allows Java Mail API to print protocol messages to the console helping you to both understand what's going on as well as debug issues.
</p>
<p>
For the full list of available Java Mail properties please refer to <a href="http://connector.sourceforge.net/doc-files/Properties.html">here</a>
</p>
</body>
</html>

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.email;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.List;
import javax.mail.Message;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.springframework.integration.mail.AbstractMailReceiver;
import org.springframework.integration.mail.ImapMailReceiver;
public class ConsumeEmailTest {
@Test
public void validateProtocol() {
AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP();
TestRunner runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
assertEquals("imap", consume.getProtocol(runner.getProcessContext()));
runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.USE_SSL, "true");
assertEquals("imaps", consume.getProtocol(runner.getProcessContext()));
consume = new ConsumePOP3();
assertEquals("pop3", consume.getProtocol(runner.getProcessContext()));
}
@Test
public void validateUrl() throws Exception {
Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl");
displayUrlField.setAccessible(true);
AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP();
TestRunner runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
assertEquals("imap://jon:qhgwjgehr@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext()));
assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume));
}
@Test
public void validateConsumeIMAP() throws Exception {
TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(0));
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
assertTrue(flowFiles.isEmpty());
runner = TestRunners.newTestRunner(new TestImapProcessor(2));
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
runner.run(2);
flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
assertTrue(flowFiles.size() == 2);
MockFlowFile ff = flowFiles.get(0);
ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8));
ff = flowFiles.get(1);
ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8));
}
public static class TestImapProcessor extends ConsumeIMAP {
private final int messagesToGenerate;
TestImapProcessor(int messagesToGenerate) {
this.messagesToGenerate = messagesToGenerate;
}
@Override
protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) {
ImapMailReceiver receiver = mock(ImapMailReceiver.class);
try {
Message[] messages = new Message[this.messagesToGenerate];
for (int i = 0; i < this.messagesToGenerate; i++) {
Message message = mock(Message.class);
when(message.getInputStream()).thenReturn(
new ByteArrayInputStream(("You've Got Mail - " + i).getBytes(StandardCharsets.UTF_8)));
messages[i] = message;
}
when(receiver.receive()).thenReturn(messages);
} catch (Exception e) {
e.printStackTrace();
fail();
}
return receiver;
}
}
}