NIFI-12888 In AbstractEmailProcessor check for expired oauth2 token when determining whether mail receiver needs to be recreated.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8494.
This commit is contained in:
tpalfy 2024-03-11 15:31:11 +01:00 committed by Pierre Villard
parent 0e255674fd
commit 92830af2e9
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5

View File

@ -22,6 +22,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.oauth2.AccessToken;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -198,13 +199,14 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
private volatile boolean shouldSetDeleteFlag;
protected volatile Optional<OAuth2AccessTokenProvider> oauth2AccessTokenProviderOptional;
protected volatile AccessToken oauth2AccessDetails;
@OnScheduled
public void onScheduled(final ProcessContext context) {
if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) {
OAuth2AccessTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
oauth2AccessTokenProvider.getAccessDetails();
oauth2AccessDetails = oauth2AccessTokenProvider.getAccessDetails();
oauth2AccessTokenProviderOptional = Optional.of(oauth2AccessTokenProvider);
} else {
@ -307,7 +309,7 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
* and is ready to receive messages.
*/
private synchronized void initializeIfNecessary(ProcessContext context, ProcessSession processSession) {
if (this.messageReceiver == null) {
if (this.messageReceiver == null || isOauth2AccessDetailsRefreshed()) {
this.processSession = processSession;
this.messageReceiver = this.buildMessageReceiver(context);
@ -324,6 +326,14 @@ abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends Ab
}
}
private boolean isOauth2AccessDetailsRefreshed() {
boolean oauthDetailsRefreshed = this.oauth2AccessTokenProviderOptional.isPresent()
&&
(this.oauth2AccessDetails == null || !oauth2AccessDetails.equals(this.oauth2AccessTokenProviderOptional.get().getAccessDetails()));
return oauthDetailsRefreshed;
}
/**
* Extracts dynamic properties which typically represent the Java Mail
* properties from the {@link ProcessContext} returning them as instance of