mirror of https://github.com/apache/nifi.git
NIFI-2779 - Add processor to GetEmail Supporting Exchange Web Services
This closes: #1326 Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
parent
86fb67d55c
commit
b7cdc6b382
|
@ -1793,3 +1793,26 @@ under a 3-Clause BSD style license:
|
||||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
||||||
|
The binary distribution of this product bundles 'EWS JAVA API' which is available
|
||||||
|
under an MIT style license. For details see https://github.com/OfficeDev/ews-java-api.
|
||||||
|
|
||||||
|
Copyright (c) 2012 Microsoft Corporation
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
|
@ -1285,6 +1285,14 @@ The following binary components are provided under the Eclipse Public License 1.
|
||||||
(EPL 1.0) JaCoCo Java Code Coverage Library ( org.jacoco ) http://www.eclemma.org/jacoco
|
(EPL 1.0) JaCoCo Java Code Coverage Library ( org.jacoco ) http://www.eclemma.org/jacoco
|
||||||
(EPLv1.0) Clojure (org.clojure:clojure:1.8.0 - http://clojure.org)
|
(EPLv1.0) Clojure (org.clojure:clojure:1.8.0 - http://clojure.org)
|
||||||
|
|
||||||
|
************************
|
||||||
|
The MIT License
|
||||||
|
************************
|
||||||
|
|
||||||
|
The following binary components are provided under the MIT License. See project link for details.
|
||||||
|
|
||||||
|
(MIT License) EWS Java API (com.microsoft.ews-java-api:ews-java-api:2.0 - https://github.com/OfficeDev/ews-java-api)
|
||||||
|
|
||||||
*****************
|
*****************
|
||||||
Mozilla Public License v2.0
|
Mozilla Public License v2.0
|
||||||
*****************
|
*****************
|
||||||
|
|
|
@ -254,3 +254,26 @@
|
||||||
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||||
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||||
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
The binary distribution of this product bundles 'EWS JAVA API' which is available
|
||||||
|
under an MIT style license. For details see https://github.com/OfficeDev/ews-java-api.
|
||||||
|
|
||||||
|
Copyright (c) 2012 Microsoft Corporation
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
|
@ -33,6 +33,20 @@ The following binary components are provided under the Apache Software License v
|
||||||
Apache Commons Logging
|
Apache Commons Logging
|
||||||
Copyright 2003-2016 The Apache Software Foundation
|
Copyright 2003-2016 The Apache Software Foundation
|
||||||
|
|
||||||
|
(ASLv2) Apache HttpComponents
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Apache HttpClient
|
||||||
|
Copyright 1999-2015 The Apache Software Foundation
|
||||||
|
|
||||||
|
Apache HttpCore
|
||||||
|
Copyright 2005-2015 The Apache Software Foundation
|
||||||
|
|
||||||
|
Apache HttpMime
|
||||||
|
Copyright 1999-2013 The Apache Software Foundation
|
||||||
|
|
||||||
|
This project contains annotations derived from JCIP-ANNOTATIONS
|
||||||
|
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
|
||||||
|
|
||||||
(ASLv2) Spring Framework
|
(ASLv2) Spring Framework
|
||||||
The following NOTICE information applies:
|
The following NOTICE information applies:
|
||||||
Spring Framework
|
Spring Framework
|
||||||
|
@ -67,6 +81,11 @@ The following binary components are provided under the Apache Software License v
|
||||||
CurvesAIP is BSD-licensed software ( https://github.com/virtuald/curvesapi/)
|
CurvesAIP is BSD-licensed software ( https://github.com/virtuald/curvesapi/)
|
||||||
Copyright (c) 2005, Graph Builder
|
Copyright (c) 2005, Graph Builder
|
||||||
|
|
||||||
|
(ASLv2) Joda Time
|
||||||
|
The following NOTICE information applies:
|
||||||
|
This product includes software developed by
|
||||||
|
Joda.org (http://www.joda.org/).
|
||||||
|
|
||||||
|
|
||||||
************************
|
************************
|
||||||
Common Development and Distribution License 1.1
|
Common Development and Distribution License 1.1
|
||||||
|
@ -84,6 +103,17 @@ The following binary components are provided under the Common Development and Di
|
||||||
|
|
||||||
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
|
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
|
||||||
|
|
||||||
|
************************
|
||||||
|
The MIT License
|
||||||
|
************************
|
||||||
|
|
||||||
|
The following binary components are provided under the MIT License. See project link for details.
|
||||||
|
|
||||||
|
(MIT License) EWS Java API
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Microsoft Corporation
|
||||||
|
Copyright (c) 2012
|
||||||
|
|
||||||
*****************
|
*****************
|
||||||
Public Domain
|
Public Domain
|
||||||
*****************
|
*****************
|
||||||
|
|
|
@ -34,6 +34,10 @@
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-processor-utils</artifactId>
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>javax.mail</groupId>
|
||||||
|
<artifactId>mail</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
<artifactId>commons-email</artifactId>
|
<artifactId>commons-email</artifactId>
|
||||||
|
@ -70,6 +74,11 @@
|
||||||
<artifactId>findbugs-annotations</artifactId>
|
<artifactId>findbugs-annotations</artifactId>
|
||||||
<version>1.3.9-1</version>
|
<version>1.3.9-1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.microsoft.ews-java-api</groupId>
|
||||||
|
<artifactId>ews-java-api</artifactId>
|
||||||
|
<version>2.0</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||||
|
|
|
@ -0,0 +1,519 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.email;
|
||||||
|
|
||||||
|
import microsoft.exchange.webservices.data.autodiscover.IAutodiscoverRedirectionUrl;
|
||||||
|
import microsoft.exchange.webservices.data.core.ExchangeService;
|
||||||
|
import microsoft.exchange.webservices.data.core.PropertySet;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.misc.ExchangeVersion;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.property.BodyType;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.property.WellKnownFolderName;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.search.FolderTraversal;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.search.LogicalOperator;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.search.SortDirection;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.service.ConflictResolutionMode;
|
||||||
|
import microsoft.exchange.webservices.data.core.enumeration.service.DeleteMode;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.folder.Folder;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.item.EmailMessage;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.item.Item;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.schema.EmailMessageSchema;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.schema.FolderSchema;
|
||||||
|
import microsoft.exchange.webservices.data.core.service.schema.ItemSchema;
|
||||||
|
import microsoft.exchange.webservices.data.credential.ExchangeCredentials;
|
||||||
|
import microsoft.exchange.webservices.data.credential.WebCredentials;
|
||||||
|
import microsoft.exchange.webservices.data.property.complex.FileAttachment;
|
||||||
|
import microsoft.exchange.webservices.data.search.FindFoldersResults;
|
||||||
|
import microsoft.exchange.webservices.data.search.FindItemsResults;
|
||||||
|
import microsoft.exchange.webservices.data.search.FolderView;
|
||||||
|
import microsoft.exchange.webservices.data.search.ItemView;
|
||||||
|
import microsoft.exchange.webservices.data.search.filter.SearchFilter;
|
||||||
|
import org.apache.commons.mail.EmailAttachment;
|
||||||
|
import org.apache.commons.mail.EmailException;
|
||||||
|
import org.apache.commons.mail.HtmlEmail;
|
||||||
|
import org.apache.commons.mail.MultiPartEmail;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.mail.Address;
|
||||||
|
import javax.mail.Flags;
|
||||||
|
import javax.mail.Message;
|
||||||
|
import javax.mail.MessagingException;
|
||||||
|
import javax.mail.internet.MimeMessage;
|
||||||
|
import javax.mail.util.ByteArrayDataSource;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||||
|
@CapabilityDescription("Consumes messages from Microsoft Exchange using Exchange Web Services. "
|
||||||
|
+ "The raw-bytes of each received email message are written as contents of the FlowFile")
|
||||||
|
@Tags({ "Email", "EWS", "Exchange", "Get", "Ingest", "Ingress", "Message", "Consume" })
|
||||||
|
public class ConsumeEWS extends AbstractProcessor {
|
||||||
|
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
|
||||||
|
.name("user")
|
||||||
|
.displayName("User Name")
|
||||||
|
.description("User Name used for authentication and authorization with Email server.")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||||
|
.name("password")
|
||||||
|
.displayName("Password")
|
||||||
|
.description("Password used for authentication and authorization with Email server.")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.sensitive(true)
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
|
||||||
|
.name("folder")
|
||||||
|
.displayName("Folder")
|
||||||
|
.description("Email folder to retrieve messages from (e.g., INBOX)")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue("INBOX")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("fetch.size")
|
||||||
|
.displayName("Fetch Size")
|
||||||
|
.description("Specify the maximum number of Messages to fetch per call to Email Server.")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue("10")
|
||||||
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor SHOULD_DELETE_MESSAGES = new PropertyDescriptor.Builder()
|
||||||
|
.name("delete.messages")
|
||||||
|
.displayName("Delete Messages")
|
||||||
|
.description("Specify whether mail messages should be deleted after retrieval.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("connection.timeout")
|
||||||
|
.displayName("Connection timeout")
|
||||||
|
.description("The amount of time to wait to connect to Email server")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue("30 sec")
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor EXCHANGE_VERSION = new PropertyDescriptor.Builder()
|
||||||
|
.name("mail-ews-version")
|
||||||
|
.displayName("Exchange Version")
|
||||||
|
.description("What version of Exchange Server the server is running.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(ExchangeVersion.values())
|
||||||
|
.defaultValue(ExchangeVersion.Exchange2010_SP2.name())
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor EWS_URL = new PropertyDescriptor.Builder()
|
||||||
|
.name("ews-url")
|
||||||
|
.displayName("EWS URL")
|
||||||
|
.description("URL of the EWS Endpoint. Required if Autodiscover is false.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor USE_AUTODISCOVER = new PropertyDescriptor.Builder()
|
||||||
|
.name("ews-autodiscover")
|
||||||
|
.displayName("Auto Discover URL")
|
||||||
|
.description("Whether or not to use the Exchange email address to Autodiscover the EWS endpoint URL.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true","false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor SHOULD_MARK_READ = new PropertyDescriptor.Builder()
|
||||||
|
.name("ews-mark-as-read")
|
||||||
|
.displayName("Mark Messages as Read")
|
||||||
|
.description("Specify if messages should be marked as read after retrieval.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final protected List<PropertyDescriptor> DESCRIPTORS;
|
||||||
|
|
||||||
|
final protected Set<Relationship> RELATIONSHIPS;
|
||||||
|
|
||||||
|
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
|
||||||
|
|
||||||
|
protected volatile BlockingQueue<Message> messageQueue;
|
||||||
|
|
||||||
|
protected volatile String displayUrl;
|
||||||
|
|
||||||
|
protected volatile ProcessSession processSession;
|
||||||
|
|
||||||
|
protected volatile boolean shouldSetDeleteFlag;
|
||||||
|
|
||||||
|
protected volatile String folderName;
|
||||||
|
protected volatile int fetchSize;
|
||||||
|
|
||||||
|
public ConsumeEWS(){
|
||||||
|
final Set<Relationship> relationshipSet = new HashSet<>();
|
||||||
|
relationshipSet.add(REL_SUCCESS);
|
||||||
|
|
||||||
|
RELATIONSHIPS = relationshipSet;
|
||||||
|
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||||
|
|
||||||
|
descriptors.add(USER);
|
||||||
|
descriptors.add(PASSWORD);
|
||||||
|
descriptors.add(FOLDER);
|
||||||
|
descriptors.add(FETCH_SIZE);
|
||||||
|
descriptors.add(SHOULD_DELETE_MESSAGES);
|
||||||
|
descriptors.add(CONNECTION_TIMEOUT);
|
||||||
|
descriptors.add(EXCHANGE_VERSION);
|
||||||
|
descriptors.add(EWS_URL);
|
||||||
|
descriptors.add(USE_AUTODISCOVER);
|
||||||
|
descriptors.add(SHOULD_MARK_READ);
|
||||||
|
|
||||||
|
DESCRIPTORS = descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return RELATIONSHIPS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return DESCRIPTORS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
|
||||||
|
if(this.messageQueue == null){
|
||||||
|
int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||||
|
this.messageQueue = new ArrayBlockingQueue<>(fetchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.folderName = context.getProperty(FOLDER).getValue();
|
||||||
|
|
||||||
|
Message emailMessage = this.receiveMessage(context);
|
||||||
|
if (emailMessage != null) {
|
||||||
|
this.transfer(emailMessage, context, processSession);
|
||||||
|
} else {
|
||||||
|
//No new messages found, yield the processor
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ExchangeService initializeIfNecessary(ProcessContext context) throws ProcessException {
|
||||||
|
ExchangeVersion ver = ExchangeVersion.valueOf(context.getProperty(EXCHANGE_VERSION).getValue());
|
||||||
|
ExchangeService service = new ExchangeService(ver);
|
||||||
|
|
||||||
|
final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
|
||||||
|
service.setTimeout(Integer.parseInt(timeoutInMillis));
|
||||||
|
|
||||||
|
String userEmail = context.getProperty(USER).getValue();
|
||||||
|
String password = context.getProperty(PASSWORD).getValue();
|
||||||
|
|
||||||
|
ExchangeCredentials credentials = new WebCredentials(userEmail, password);
|
||||||
|
service.setCredentials(credentials);
|
||||||
|
|
||||||
|
Boolean useAutodiscover = context.getProperty(USE_AUTODISCOVER).asBoolean();
|
||||||
|
if(useAutodiscover){
|
||||||
|
try {
|
||||||
|
service.autodiscoverUrl(userEmail, new RedirectionUrlCallback());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ProcessException("Failure setting Autodiscover URL from email address.", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
String ewsURL = context.getProperty(EWS_URL).getValue();
|
||||||
|
try {
|
||||||
|
service.setUrl(new URI(ewsURL));
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new ProcessException("Failure setting EWS URL.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return service;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.description("Specifies the value for '" + propertyDescriptorName + "' Java Mail property.")
|
||||||
|
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the target receivere's mail protocol (e.g., imap, pop etc.)
|
||||||
|
*/
|
||||||
|
protected String getProtocol(ProcessContext processContext) {
|
||||||
|
return "ews";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fills the internal message queue if such queue is empty. This is due to
|
||||||
|
* the fact that per single session there may be multiple messages retrieved
|
||||||
|
* from the email server (see FETCH_SIZE).
|
||||||
|
*/
|
||||||
|
protected void fillMessageQueueIfNecessary(ProcessContext context) throws ProcessException {
|
||||||
|
if (this.messageQueue.isEmpty()) {
|
||||||
|
ExchangeService service = this.initializeIfNecessary(context);
|
||||||
|
boolean deleteOnRead = context.getProperty(SHOULD_DELETE_MESSAGES).getValue().equals("true");
|
||||||
|
boolean markAsRead = context.getProperty(SHOULD_MARK_READ).getValue().equals("true");
|
||||||
|
|
||||||
|
try {
|
||||||
|
//Get Folder
|
||||||
|
Folder folder = getFolder(service);
|
||||||
|
|
||||||
|
ItemView view = new ItemView(messageQueue.remainingCapacity());
|
||||||
|
view.getOrderBy().add(ItemSchema.DateTimeReceived, SortDirection.Ascending);
|
||||||
|
|
||||||
|
SearchFilter sf = new SearchFilter.SearchFilterCollection(LogicalOperator.And, new SearchFilter.IsEqualTo(EmailMessageSchema.IsRead, false));
|
||||||
|
FindItemsResults<Item> findResults = service.findItems(folder.getId(), sf, view);
|
||||||
|
|
||||||
|
if(findResults == null || findResults.getItems().size()== 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
service.loadPropertiesForItems(findResults, PropertySet.FirstClassProperties);
|
||||||
|
|
||||||
|
for (Item item : findResults) {
|
||||||
|
EmailMessage ewsMessage = (EmailMessage) item;
|
||||||
|
messageQueue.add(parseMessage(ewsMessage));
|
||||||
|
|
||||||
|
if(deleteOnRead){
|
||||||
|
ewsMessage.delete(DeleteMode.HardDelete);
|
||||||
|
} else if(markAsRead){
|
||||||
|
ewsMessage.setIsRead(true);
|
||||||
|
ewsMessage.update(ConflictResolutionMode.AlwaysOverwrite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
service.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ProcessException("Failed retrieving new messages from EWS.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Folder getFolder(ExchangeService service) {
|
||||||
|
Folder folder;
|
||||||
|
if(folderName.equals("INBOX")){
|
||||||
|
try {
|
||||||
|
folder = Folder.bind(service, WellKnownFolderName.Inbox);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ProcessException("Failed to bind Inbox Folder on EWS Server", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
FolderView view = new FolderView(10);
|
||||||
|
view.setTraversal(FolderTraversal.Deep);
|
||||||
|
SearchFilter searchFilter = new SearchFilter.IsEqualTo(FolderSchema.DisplayName, folderName);
|
||||||
|
try {
|
||||||
|
FindFoldersResults foldersResults = service.findFolders(WellKnownFolderName.Root,searchFilter, view);
|
||||||
|
ArrayList<Folder> folderIds = foldersResults.getFolders();
|
||||||
|
if(folderIds.size() > 1){
|
||||||
|
throw new ProcessException("More than 1 folder found with the name " + folderName);
|
||||||
|
}
|
||||||
|
|
||||||
|
folder = Folder.bind(service, folderIds.get(0).getId());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ProcessException("Search for Inbox Subfolder failed.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return folder;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MimeMessage parseMessage(EmailMessage item) throws Exception {
|
||||||
|
EmailMessage ewsMessage = item;
|
||||||
|
final String bodyText = ewsMessage.getBody().toString();
|
||||||
|
|
||||||
|
MultiPartEmail mm;
|
||||||
|
|
||||||
|
if(ewsMessage.getBody().getBodyType() == BodyType.HTML){
|
||||||
|
mm = new HtmlEmail().setHtmlMsg(bodyText);
|
||||||
|
} else {
|
||||||
|
mm = new MultiPartEmail();
|
||||||
|
mm.setMsg(bodyText);
|
||||||
|
}
|
||||||
|
mm.setHostName("NiFi-EWS");
|
||||||
|
//from
|
||||||
|
mm.setFrom(ewsMessage.getFrom().getAddress());
|
||||||
|
//to recipients
|
||||||
|
ewsMessage.getToRecipients().forEach(x->{
|
||||||
|
try {
|
||||||
|
mm.addTo(x.getAddress());
|
||||||
|
} catch (EmailException e) {
|
||||||
|
throw new ProcessException("Failed to add TO recipient.", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
//cc recipients
|
||||||
|
ewsMessage.getCcRecipients().forEach(x->{
|
||||||
|
try {
|
||||||
|
mm.addCc(x.getAddress());
|
||||||
|
} catch (EmailException e) {
|
||||||
|
throw new ProcessException("Failed to add CC recipient.", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
//subject
|
||||||
|
mm.setSubject(ewsMessage.getSubject());
|
||||||
|
//sent date
|
||||||
|
mm.setSentDate(ewsMessage.getDateTimeSent());
|
||||||
|
//add message headers
|
||||||
|
ewsMessage.getInternetMessageHeaders().forEach(x-> mm.addHeader(x.getName(), x.getValue()));
|
||||||
|
|
||||||
|
//Any attachments
|
||||||
|
if(ewsMessage.getHasAttachments()){
|
||||||
|
ewsMessage.getAttachments().forEach(x->{
|
||||||
|
try {
|
||||||
|
FileAttachment file = (FileAttachment)x;
|
||||||
|
file.load();
|
||||||
|
|
||||||
|
ByteArrayDataSource bds = new ByteArrayDataSource(file.getContent(), file.getContentType());
|
||||||
|
|
||||||
|
mm.attach(bds,file.getName(), "", EmailAttachment.ATTACHMENT);
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
mm.buildMimeMessage();
|
||||||
|
return mm.getMimeMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disposes the message by converting it to a {@link FlowFile} transferring
|
||||||
|
* it to the REL_SUCCESS relationship.
|
||||||
|
*/
|
||||||
|
private void transfer(Message emailMessage, ProcessContext context, ProcessSession processSession) {
|
||||||
|
long start = System.nanoTime();
|
||||||
|
FlowFile flowFile = processSession.create();
|
||||||
|
|
||||||
|
flowFile = processSession.append(flowFile, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final OutputStream out) throws IOException {
|
||||||
|
try {
|
||||||
|
emailMessage.writeTo(out);
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||||
|
|
||||||
|
String fromAddressesString = "";
|
||||||
|
try {
|
||||||
|
Address[] fromAddresses = emailMessage.getFrom();
|
||||||
|
if (fromAddresses != null) {
|
||||||
|
fromAddressesString = Arrays.asList(fromAddresses).toString();
|
||||||
|
}
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
this.logger.warn("Faild to retrieve 'From' attribute from Message.");
|
||||||
|
}
|
||||||
|
|
||||||
|
processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration);
|
||||||
|
this.getLogger().info("Successfully received {} from {} in {} millis", new Object[]{flowFile, fromAddressesString, executionDuration});
|
||||||
|
processSession.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
|
try {
|
||||||
|
emailMessage.setFlag(Flags.Flag.DELETED, this.shouldSetDeleteFlag);
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
this.logger.warn("Failed to set DELETE Flag on the message, data duplication may occur.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receives message from the internal queue filling up the queue if
|
||||||
|
* necessary.
|
||||||
|
*/
|
||||||
|
protected Message receiveMessage(ProcessContext context) {
|
||||||
|
Message emailMessage = null;
|
||||||
|
try {
|
||||||
|
this.fillMessageQueueIfNecessary(context);
|
||||||
|
emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
context.yield();
|
||||||
|
this.logger.error("Failed retrieving messages from EWS.", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
this.logger.debug("Current thread is interrupted");
|
||||||
|
}
|
||||||
|
return emailMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@OnStopped
|
||||||
|
public void stop(ProcessContext processContext) {
|
||||||
|
this.flushRemainingMessages(processContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Will flush the remaining messages when this processor is stopped.
|
||||||
|
*/
|
||||||
|
protected void flushRemainingMessages(ProcessContext processContext) {
|
||||||
|
Message emailMessage;
|
||||||
|
try {
|
||||||
|
while ((emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
|
||||||
|
this.transfer(emailMessage, processContext, this.processSession);
|
||||||
|
this.processSession.commit();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
this.logger.debug("Current thread is interrupted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class RedirectionUrlCallback implements IAutodiscoverRedirectionUrl {
|
||||||
|
public boolean autodiscoverRedirectionUrlValidationCallback(
|
||||||
|
String redirectionUrl) {
|
||||||
|
return redirectionUrl.toLowerCase().startsWith("https://");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,3 +18,4 @@ org.apache.nifi.processors.email.ExtractEmailHeaders
|
||||||
org.apache.nifi.processors.email.ListenSMTP
|
org.apache.nifi.processors.email.ListenSMTP
|
||||||
org.apache.nifi.processors.email.ConsumeIMAP
|
org.apache.nifi.processors.email.ConsumeIMAP
|
||||||
org.apache.nifi.processors.email.ConsumePOP3
|
org.apache.nifi.processors.email.ConsumePOP3
|
||||||
|
org.apache.nifi.processors.email.ConsumeEWS
|
Loading…
Reference in New Issue