mirror of https://github.com/apache/nifi.git
NIFI-11107 In ConsumeIMAP and ConsumePOP3 added support for OAUTH based authorization.
This closes #6900. Reviewed-by: Nandor Soma Abonyi <abonyisoma@gmail.com> Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
6877e84931
commit
56d8879e91
|
@ -39,6 +39,11 @@
|
||||||
<artifactId>nifi-utils</artifactId>
|
<artifactId>nifi-utils</artifactId>
|
||||||
<version>1.20.0-SNAPSHOT</version>
|
<version>1.20.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-oauth2-provider-api</artifactId>
|
||||||
|
<version>1.20.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>javax.mail</groupId>
|
<groupId>javax.mail</groupId>
|
||||||
<artifactId>mail</artifactId>
|
<artifactId>mail</artifactId>
|
||||||
|
|
|
@ -16,10 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.email;
|
package org.apache.nifi.processors.email;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
@ -43,6 +46,7 @@ import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
@ -56,7 +60,17 @@ import java.util.concurrent.TimeUnit;
|
||||||
* @param <T> the type of {@link AbstractMailReceiver}.
|
* @param <T> the type of {@link AbstractMailReceiver}.
|
||||||
*/
|
*/
|
||||||
abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor {
|
abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor {
|
||||||
|
public static final AllowableValue PASSWORD_BASED_AUTHORIZATION_MODE = new AllowableValue(
|
||||||
|
"password-based-authorization-mode",
|
||||||
|
"Use Password",
|
||||||
|
"Use password"
|
||||||
|
);
|
||||||
|
|
||||||
|
public static final AllowableValue OAUTH_AUTHORIZATION_MODE = new AllowableValue(
|
||||||
|
"oauth-based-authorization-mode",
|
||||||
|
"Use OAuth2",
|
||||||
|
"Use OAuth2 to acquire access token"
|
||||||
|
);
|
||||||
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
|
||||||
.name("host")
|
.name("host")
|
||||||
.displayName("Host Name")
|
.displayName("Host Name")
|
||||||
|
@ -73,6 +87,22 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor AUTHORIZATION_MODE = new PropertyDescriptor.Builder()
|
||||||
|
.name("authorization-mode")
|
||||||
|
.displayName("Authorization Mode")
|
||||||
|
.description("How to authorize sending email on the user's behalf.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(PASSWORD_BASED_AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE)
|
||||||
|
.defaultValue(PASSWORD_BASED_AUTHORIZATION_MODE.getValue())
|
||||||
|
.build();
|
||||||
|
public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
|
||||||
|
.name("oauth2-access-token-provider")
|
||||||
|
.displayName("OAuth2 Access Token Provider")
|
||||||
|
.description("OAuth2 service that can provide access tokens.")
|
||||||
|
.identifiesControllerService(OAuth2AccessTokenProvider.class)
|
||||||
|
.dependsOn(AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
|
||||||
.name("user")
|
.name("user")
|
||||||
.displayName("User Name")
|
.displayName("User Name")
|
||||||
|
@ -85,6 +115,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
.name("password")
|
.name("password")
|
||||||
.displayName("Password")
|
.displayName("Password")
|
||||||
.description("Password used for authentication and authorization with Email server.")
|
.description("Password used for authentication and authorization with Email server.")
|
||||||
|
.dependsOn(AUTHORIZATION_MODE, PASSWORD_BASED_AUTHORIZATION_MODE)
|
||||||
.required(true)
|
.required(true)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
@ -143,6 +174,8 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
static {
|
static {
|
||||||
SHARED_DESCRIPTORS.add(HOST);
|
SHARED_DESCRIPTORS.add(HOST);
|
||||||
SHARED_DESCRIPTORS.add(PORT);
|
SHARED_DESCRIPTORS.add(PORT);
|
||||||
|
SHARED_DESCRIPTORS.add(AUTHORIZATION_MODE);
|
||||||
|
SHARED_DESCRIPTORS.add(OAUTH2_ACCESS_TOKEN_PROVIDER);
|
||||||
SHARED_DESCRIPTORS.add(USER);
|
SHARED_DESCRIPTORS.add(USER);
|
||||||
SHARED_DESCRIPTORS.add(PASSWORD);
|
SHARED_DESCRIPTORS.add(PASSWORD);
|
||||||
SHARED_DESCRIPTORS.add(FOLDER);
|
SHARED_DESCRIPTORS.add(FOLDER);
|
||||||
|
@ -165,6 +198,21 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
|
|
||||||
private volatile boolean shouldSetDeleteFlag;
|
private volatile boolean shouldSetDeleteFlag;
|
||||||
|
|
||||||
|
protected volatile Optional<OAuth2AccessTokenProvider> oauth2AccessTokenProviderOptional;
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
|
||||||
|
OAuth2AccessTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
|
||||||
|
|
||||||
|
oauth2AccessTokenProvider.getAccessDetails();
|
||||||
|
|
||||||
|
oauth2AccessTokenProviderOptional = Optional.of(oauth2AccessTokenProvider);
|
||||||
|
} else {
|
||||||
|
oauth2AccessTokenProviderOptional = Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@OnStopped
|
@OnStopped
|
||||||
public void stop(ProcessContext processContext) {
|
public void stop(ProcessContext processContext) {
|
||||||
this.flushRemainingMessages(processContext);
|
this.flushRemainingMessages(processContext);
|
||||||
|
@ -229,7 +277,13 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
|
String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
|
||||||
String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
|
String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
|
||||||
String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
|
String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
|
||||||
String password = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
|
|
||||||
|
String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> {
|
||||||
|
String accessToken = oauth2AccessTokenProvider.getAccessDetails().getAccessToken();
|
||||||
|
|
||||||
|
return accessToken;
|
||||||
|
}).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue());
|
||||||
|
|
||||||
String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
|
String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
StringBuilder urlBuilder = new StringBuilder();
|
StringBuilder urlBuilder = new StringBuilder();
|
||||||
|
@ -304,9 +358,14 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
|
||||||
propertyDescriptorEntry.getValue());
|
propertyDescriptorEntry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String propertyName = this.getProtocol(context).equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout";
|
String protocol = this.getProtocol(context);
|
||||||
|
|
||||||
|
String propertyName = protocol.equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout";
|
||||||
final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
|
final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS));
|
||||||
javaMailProperties.setProperty(propertyName, timeoutInMillis);
|
javaMailProperties.setProperty(propertyName, timeoutInMillis);
|
||||||
|
|
||||||
|
oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider -> javaMailProperties.put("mail." + protocol + ".auth.mechanisms", "XOAUTH2"));
|
||||||
|
|
||||||
return javaMailProperties;
|
return javaMailProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue