NIFI-11050 In PutEmail added OAuth authorization option. StandardOauth2AccessTokenProvider can be used with refresh_token grant type.

This closes #6845.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2023-01-13 13:23:08 +01:00 committed by Peter Turcsanyi
parent 9067774237
commit 464e0a96ee
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
5 changed files with 302 additions and 3 deletions

View File

@ -43,6 +43,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -51,6 +52,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -73,6 +75,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
@ -109,6 +112,37 @@ public class PutEmail extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final AllowableValue PASSWORD_BASED_AUTHORIZATION_MODE = new AllowableValue(
"password-based-authorization-mode",
"Use Password",
"Use password"
);
public static final AllowableValue OAUTH_AUTHORIZATION_MODE = new AllowableValue(
"oauth-based-authorization-mode",
"Use OAuth2",
"Use OAuth2 to acquire access token"
);
public static final PropertyDescriptor AUTHORIZATION_MODE = new PropertyDescriptor.Builder()
.name("authorization-mode")
.displayName("Authorization Mode")
.description("How to authorize sending email on the user's behalf.")
.required(true)
.allowableValues(PASSWORD_BASED_AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE)
.defaultValue(PASSWORD_BASED_AUTHORIZATION_MODE.getValue())
.build();
public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
.name("oauth2-access-token-provider")
.displayName("OAuth2 Access Token Provider")
.description("OAuth2 service that can provide access tokens.")
.identifiesControllerService(OAuth2AccessTokenProvider.class)
.dependsOn(AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE)
.required(true)
.build();
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
.name("SMTP Username")
.description("Username for the SMTP account")
@ -119,6 +153,7 @@ public class PutEmail extends AbstractProcessor {
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
.name("SMTP Password")
.description("Password for the SMTP account")
.dependsOn(AUTHORIZATION_MODE, PASSWORD_BASED_AUTHORIZATION_MODE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@ -291,6 +326,8 @@ public class PutEmail extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SMTP_HOSTNAME);
properties.add(SMTP_PORT);
properties.add(AUTHORIZATION_MODE);
properties.add(OAUTH2_ACCESS_TOKEN_PROVIDER);
properties.add(SMTP_USERNAME);
properties.add(SMTP_PASSWORD);
properties.add(SMTP_AUTH);
@ -356,10 +393,23 @@ public class PutEmail extends AbstractProcessor {
}
private volatile Pattern attributeNamePattern = null;
private volatile Optional<OAuth2AccessTokenProvider> oauth2AccessTokenProviderOptional;
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
this.attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
OAuth2AccessTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
oauth2AccessTokenProvider.getAccessDetails();
oauth2AccessTokenProviderOptional = Optional.of(oauth2AccessTokenProvider);
} else {
oauth2AccessTokenProviderOptional = Optional.empty();
}
}
private void setMessageHeader(final String header, final String value, final Message message) throws MessagingException {
@ -520,6 +570,13 @@ public class PutEmail extends AbstractProcessor {
}
}
oauth2AccessTokenProviderOptional.ifPresent(oAuth2AccessTokenProvider -> {
String accessToken = oAuth2AccessTokenProvider.getAccessDetails().getAccessToken();
properties.setProperty("mail.smtp.password", accessToken);
properties.put("mail.smtp.auth.mechanisms", "XOAUTH2");
});
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
final String mailPropertyValue = context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue();

View File

@ -0,0 +1,111 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PutEmail</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>OAuth Authorization Mode</h1>
<p>
PutEmail can use OAuth2. The exact way may depend on the email provider.<br><br>
</p>
<h2>OAuth with Gmail</h2>
<h3>Configure Gmail OAuth Client</h3>
<p>
The Gmail OAuth client can be used to send email on behalf of multiple different gmail accounts so this needs to be done once.
<ol>
<li>In the Google Development Console <a href="https://support.google.com/googleapi/answer/6251787">Create a project</a> (if you don't have one yet)</li>
<li><a href="https://console.cloud.google.com/apis/credentials/consent">Configure OAuth consent</a></li>
<li>
<a href="https://console.cloud.google.com/apis/credentials/oauthclient">Create OAuth client</a>. Select <b>Desktop app</b> as <b>Application type</b>.
When the client has been created, take note of the Client ID and Client secret values as they will be needed later.
</li>
</ol>
</p>
<h3>Retrieve Token for NiFi</h3>
<p>
Tokens are provided once the owner of the Gmail account consented to the previously created client to send emails on their behalf.
Consequently, this needs to be done for every gmail account.
<ol>
<li>Go to the following web page:
<pre>https://accounts.google.com/o/oauth2/auth?redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&scope=https%3A%2F%2Fmail.google.com&client_id=<i>CLIENT_ID</i></pre>
Replace CLIENT_ID at the end to your Client ID.
</li>
<li>You may need to select the Google Account for which you want to consent. Click <b>Continue</b> twice.</li>
<li>
A page will appear with an Authorisation code that will have a message at the bottom like this:
<pre>
<b>Authorisation code</b>
<br />
Please copy this code, switch to your application and paste it there:
<br />
<i>AUTHORISATION_CODE</i>
</pre>
</li>
<li>
Execute the following command from terminal to fetch the access and refresh tokens.<br>
In case the curl command returns an error, please try again from step 1.<br>
<pre>curl https://accounts.google.com/o/oauth2/token -d grant_type=authorization_code -d redirect_uri="urn:ietf:wg:oauth:2.0:oob" -d client_id=<i>CLIENT_ID</i> -d client_secret=<i>CLIENT_SECRET</i> -d code=<i>AUTHORISATION_CODE</i></pre>
Replace CLIENT_ID, CLIENT_SECRET and AUTHORISATION_CODE to your values.
</li>
<li>
The curl command results a json file which contains the access token and refresh token:
<pre>
{
"access_token": "ACCESS_TOKEN",
"expires_in": 3599,
"refresh_token": "REFRESH_TOKEN",
"scope": "https://mail.google.com/",
"token_type": "Bearer"
}
</pre>
</li>
</ol>
<h3>Configure Token in NiFi</h3>
<ol>
<li>
On the PutEmail processor in the <b>Authorization Mode</b> property select <b>Use OAuth2</b>.
</li>
<li>
In the <b>OAuth2 Access Token Provider</b> property select/create a StandardOauth2AccessTokenProvider controller service.
</li>
<li>
On the StandardOauth2AccessTokenProvider controller service in the <b>Grant Type</b> property select <b>Refresh Token</b>.
</li>
<li>
In the <b>Refresh Token</b> property enter the REFRESH_TOKEN returned by the curl command.
</li>
<li>
In the <b>Authorization Server URL</b> enter <pre>https://accounts.google.com/o/oauth2/token</pre>
</li>
<li>Also fill in the <b>Client ID</b> and <b>Client secret</b> properties.</li>
</ol>
</p>
</body>
</html>

View File

@ -26,6 +26,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -44,6 +45,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutEmail {
@ -137,6 +141,40 @@ public class TestPutEmail {
assertNull(message.getRecipients(RecipientType.CC));
}
@Test
public void testOAuth() throws Exception {
// GIVEN
String oauthServiceID = "oauth-access-token-provider";
String access_token = "access_token_123";
OAuth2AccessTokenProvider oauthService = mock(OAuth2AccessTokenProvider.class, RETURNS_DEEP_STUBS);
when(oauthService.getIdentifier()).thenReturn(oauthServiceID);
when(oauthService.getAccessDetails().getAccessToken()).thenReturn(access_token);
runner.addControllerService(oauthServiceID, oauthService);
runner.enableControllerService(oauthService);
runner.setProperty(PutEmail.SMTP_HOSTNAME, "unimportant");
runner.setProperty(PutEmail.FROM, "unimportant");
runner.setProperty(PutEmail.TO, "unimportant");
runner.setProperty(PutEmail.MESSAGE, "unimportant");
runner.setProperty(PutEmail.AUTHORIZATION_MODE, PutEmail.OAUTH_AUTHORIZATION_MODE);
runner.setProperty(PutEmail.OAUTH2_ACCESS_TOKEN_PROVIDER, oauthServiceID);
// WHEN
runner.enqueue("Unimportant flowfile content".getBytes());
runner.run();
// THEN
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS);
assertEquals(1, processor.getMessages().size(), "Expected a single message to be sent");
Message message = processor.getMessages().get(0);
assertEquals("XOAUTH2", message.getSession().getProperty("mail.smtp.auth.mechanisms"));
assertEquals("access_token_123", message.getSession().getProperty("mail.smtp.password"));
}
@Test
public void testOutgoingMessageWithOptionalProperties() throws Exception {
// verifies that optional attributes are set on the outgoing Message correctly

View File

@ -83,13 +83,19 @@ public class StandardOauth2AccessTokenProvider extends AbstractControllerService
public static AllowableValue RESOURCE_OWNER_PASSWORD_CREDENTIALS_GRANT_TYPE = new AllowableValue(
"password",
"User Password",
"Resource Owner Password Credentials Grant. Used to access resources available to users. Requires username and password and usually Client ID and Client Secret"
"Resource Owner Password Credentials Grant. Used to access resources available to users. Requires username and password and usually Client ID and Client Secret."
);
public static AllowableValue CLIENT_CREDENTIALS_GRANT_TYPE = new AllowableValue(
"client_credentials",
"Client Credentials",
"Client Credentials Grant. Used to access resources available to clients. Requires Client ID and Client Secret"
"Client Credentials Grant. Used to access resources available to clients. Requires Client ID and Client Secret."
);
public static AllowableValue REFRESH_TOKEN_GRANT_TYPE = new AllowableValue(
"refresh_token",
"Refresh Token",
"Refresh Token Grant. Used to get fresh access tokens based on a previously acquired refresh token. Requires Client ID and Client Secret (apart from Refresh Token)."
);
public static final PropertyDescriptor GRANT_TYPE = new PropertyDescriptor.Builder()
@ -97,7 +103,7 @@ public class StandardOauth2AccessTokenProvider extends AbstractControllerService
.displayName("Grant Type")
.description("The OAuth2 Grant Type to be used when acquiring an access token.")
.required(true)
.allowableValues(RESOURCE_OWNER_PASSWORD_CREDENTIALS_GRANT_TYPE, CLIENT_CREDENTIALS_GRANT_TYPE)
.allowableValues(RESOURCE_OWNER_PASSWORD_CREDENTIALS_GRANT_TYPE, CLIENT_CREDENTIALS_GRANT_TYPE, REFRESH_TOKEN_GRANT_TYPE)
.defaultValue(RESOURCE_OWNER_PASSWORD_CREDENTIALS_GRANT_TYPE.getValue())
.build();
@ -121,6 +127,17 @@ public class StandardOauth2AccessTokenProvider extends AbstractControllerService
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor REFRESH_TOKEN = new PropertyDescriptor.Builder()
.name("refresh-token")
.displayName("Refresh Token")
.description("Refresh Token.")
.dependsOn(GRANT_TYPE, REFRESH_TOKEN_GRANT_TYPE)
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder()
.name("client-id")
.displayName("Client ID")
@ -178,6 +195,7 @@ public class StandardOauth2AccessTokenProvider extends AbstractControllerService
GRANT_TYPE,
USERNAME,
PASSWORD,
REFRESH_TOKEN,
CLIENT_ID,
CLIENT_SECRET,
SCOPE,
@ -225,6 +243,16 @@ public class StandardOauth2AccessTokenProvider extends AbstractControllerService
clientSecret = context.getProperty(CLIENT_SECRET).getValue();
scope = context.getProperty(SCOPE).getValue();
if (context.getProperty(REFRESH_TOKEN).isSet()) {
String refreshToken = context.getProperty(REFRESH_TOKEN).evaluateAttributeExpressions().getValue();
AccessToken accessDetailsWithRefreshTokenOnly = new AccessToken();
accessDetailsWithRefreshTokenOnly.setRefreshToken(refreshToken);
accessDetailsWithRefreshTokenOnly.setExpiresIn(-1);
this.accessDetails = accessDetailsWithRefreshTokenOnly;
}
refreshWindowSeconds = context.getProperty(REFRESH_WINDOW).asTimePeriod(TimeUnit.SECONDS);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.oauth2;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
@ -180,6 +181,70 @@ public class StandardOauth2AccessTokenProviderTest {
runner.assertNotValid(testSubject);
}
@Test
public void testInvalidWhenRefreshTokenGrantTypeSetWithoutRefreshToken() throws Exception {
// GIVEN
Processor processor = new NoOpProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("testSubject", testSubject);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.GRANT_TYPE, StandardOauth2AccessTokenProvider.REFRESH_TOKEN_GRANT_TYPE);
// THEN
runner.assertNotValid(testSubject);
}
@Test
public void testValidWhenRefreshTokenGrantTypeSetWithRefreshToken() throws Exception {
// GIVEN
Processor processor = new NoOpProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("testSubject", testSubject);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.GRANT_TYPE, StandardOauth2AccessTokenProvider.REFRESH_TOKEN_GRANT_TYPE);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.REFRESH_TOKEN, "refresh_token");
// THEN
runner.assertValid(testSubject);
}
@Test
public void testAcquireNewTokenWhenGrantTypeIsRefreshToken() throws Exception {
// GIVEN
String refreshToken = "refresh_token_123";
String accessToken = "access_token_123";
Processor processor = new NoOpProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("testSubject", testSubject);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.GRANT_TYPE, StandardOauth2AccessTokenProvider.REFRESH_TOKEN_GRANT_TYPE);
runner.setProperty(testSubject, StandardOauth2AccessTokenProvider.REFRESH_TOKEN, refreshToken);
runner.enableControllerService(testSubject);
Response response = buildResponse(HTTP_OK, "{\"access_token\":\"" + accessToken + "\"}");
when(mockHttpClient.newCall(any(Request.class)).execute()).thenReturn(response);
// WHEN
String actualAccessToken = testSubject.getAccessDetails().getAccessToken();
// THEN
verify(mockHttpClient, atLeast(1)).newCall(requestCaptor.capture());
FormBody capturedRequestBody = (FormBody) requestCaptor.getValue().body();
assertEquals("grant_type", capturedRequestBody.encodedName(0));
assertEquals("refresh_token", capturedRequestBody.encodedValue(0));
assertEquals("refresh_token", capturedRequestBody.encodedName(1));
assertEquals("refresh_token_123", capturedRequestBody.encodedValue(1));
assertEquals(accessToken, actualAccessToken);
}
@Test
public void testValidWhenClientAuthenticationStrategyIsValid() throws Exception {
// GIVEN