NIFI-7581 Added ControllerService-based credential settings support for ADLS processors

NIFI-7581 Separated Controller Service for providing Azure credentials for ADLS (ADLSCredentialsControllerService) form the one that does the same for Blob storages (AzureStorageCredentialsDetails). (This was done due to the considerable difference in the APIs of the libraries used to connect to both.)
NIFI-7581 Fix: Register controller service in META-INF. Minor fixes.
NIFI-7581 Minor changes (documentation, type etc.)
NIFI-7581 Updated integration tests.
NIFI-7581 Minor changes (renaming).

This closes #4369.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-06-26 18:21:47 +02:00 committed by Peter Turcsanyi
parent 8d53f5d0c9
commit 19a84dd3dd
13 changed files with 639 additions and 174 deletions

View File

@ -59,7 +59,7 @@
<dependency> <dependency>
<groupId>com.azure</groupId> <groupId>com.azure</groupId>
<artifactId>azure-core</artifactId> <artifactId>azure-core</artifactId>
<version>1.5.0</version> <version>${azure.core.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.azure</groupId> <groupId>com.azure</groupId>

View File

@ -16,15 +16,15 @@
*/ */
package org.apache.nifi.processors.azure; package org.apache.nifi.processors.azure;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder; import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential; import com.azure.storage.common.StorageSharedKeyCredential;
@ -33,8 +33,6 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext; import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
@ -42,58 +40,20 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import reactor.core.publisher.Mono;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("storage-account-name").displayName("Storage Account Name") .name("adls-credentials-service")
.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + .displayName("ADLS Credentials")
"attribute. While it does provide for a more flexible flow by allowing the account name to " + .description("Controller Service used to obtain Azure Credentials.")
"be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + .identifiesControllerService(ADLSCredentialsService.class)
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions." +
" Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
"the preferred way is to configure them through a controller service")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true) .required(true)
.sensitive(true).build();
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("storage-account-key").displayName("Storage Account Key")
.description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
"There are certain risks in allowing the account key to be stored as a flowfile " +
"attribute. While it does provide for a more flexible flow by allowing the account key to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.sensitive(true).build();
public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder()
.name("storage-sas-token").displayName("SAS Token")
.description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
"There are certain risks in allowing the SAS token to be stored as a flowfile " +
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
"be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name") .name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.") .description("Name of the Azure Storage File System. It is assumed to be already existing.")
@ -119,15 +79,6 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.defaultValue("${azure.filename}") .defaultValue("${azure.filename}")
.build(); .build();
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.name("endpoint-suffix").displayName("Endpoint Suffix")
.description("Endpoint Suffix")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.defaultValue("dfs.core.windows.net")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
"Files that have been successfully written to Azure storage are transferred to this relationship") "Files that have been successfully written to Azure storage are transferred to this relationship")
.build(); .build();
@ -135,66 +86,38 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
"Files that could not be written to Azure storage for some reason are transferred to this relationship") "Files that could not be written to Azure storage for some reason are transferred to this relationship")
.build(); .build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList( private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, ADLS_CREDENTIALS_SERVICE,
AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, FILESYSTEM,
AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, DIRECTORY,
AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY, FILE
AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX, ));
AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
AbstractAzureDataLakeStorageProcessor.DIRECTORY,
AbstractAzureDataLakeStorageProcessor.FILE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
new HashSet<>(Arrays.asList( REL_SUCCESS,
AbstractAzureBlobProcessor.REL_SUCCESS, REL_FAILURE
AbstractAzureBlobProcessor.REL_FAILURE))); )));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
} }
public static Collection<ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final boolean useManagedIdentity = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();
final boolean accountKeyIsSet = validationContext.getProperty(ACCOUNT_KEY).isSet();
final boolean sasTokenIsSet = validationContext.getProperty(SAS_TOKEN).isSet();
int credential_config_found = 0;
if(useManagedIdentity) credential_config_found++;
if(accountKeyIsSet) credential_config_found++;
if(sasTokenIsSet) credential_config_found++;
if(credential_config_found == 0){
final String msg = String.format(
"At least one of ['%s', '%s', '%s'] should be set",
ACCOUNT_KEY.getDisplayName(),
SAS_TOKEN.getDisplayName(),
USE_MANAGED_IDENTITY.getDisplayName()
);
results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
} else if(credential_config_found > 1) {
final String msg = String.format(
"Only one of ['%s', '%s', '%s'] should be set",
ACCOUNT_KEY.getDisplayName(),
SAS_TOKEN.getDisplayName(),
USE_MANAGED_IDENTITY.getDisplayName()
);
results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
}
return results;
}
public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue(); ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes);
final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
final AccessToken accessToken = credentialsDetails.getAccessToken();
final String endpointSuffix = credentialsDetails.getEndpointSuffix();
final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity();
final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix); final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
DataLakeServiceClient storageClient; DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) { if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
@ -204,6 +127,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
} else if (StringUtils.isNotBlank(sasToken)) { } else if (StringUtils.isNotBlank(sasToken)) {
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient(); .buildClient();
} else if (accessToken != null) {
TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildClient();
} else if(useManagedIdentity){ } else if(useManagedIdentity){
final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder() final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder()
.build(); .build();
@ -212,16 +140,10 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.credential(misCrendential) .credential(misCrendential)
.buildClient(); .buildClient();
} else { } else {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", throw new IllegalArgumentException("No valid credentials were provided");
ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName()));
}
return storageClient;
} }
@Override return storageClient;
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = validateCredentialProperties(validationContext);
return results;
} }
@Override @Override

View File

@ -49,8 +49,13 @@ public final class AzureStorageUtils {
public static final String BLOCK = "Block"; public static final String BLOCK = "Block";
public static final String PAGE = "Page"; public static final String PAGE = "Page";
public static final String STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME = "storage-account-name";
public static final String STORAGE_ACCOUNT_KEY_PROPERTY_DESCRIPTOR_NAME = "storage-account-key";
public static final String STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME = "storage-sas-token";
public static final String STORAGE_ENDPOINT_SUFFIX_PROPERTY_DESCRIPTOR_NAME = "storage-endpoint-suffix";
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("storage-account-key") .name(STORAGE_ACCOUNT_KEY_PROPERTY_DESCRIPTOR_NAME)
.displayName("Storage Account Key") .displayName("Storage Account Key")
.description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
@ -73,7 +78,7 @@ public final class AzureStorageUtils {
"In addition, the provenance repositories may be put on encrypted disk partitions."; "In addition, the provenance repositories may be put on encrypted disk partitions.";
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("storage-account-name") .name(STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME)
.displayName("Storage Account Name") .displayName("Storage Account Name")
.description(ACCOUNT_NAME_BASE_DESCRIPTION + .description(ACCOUNT_NAME_BASE_DESCRIPTION +
" Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " + " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
@ -87,11 +92,11 @@ public final class AzureStorageUtils {
.build(); .build();
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder() public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.name("storage-endpoint-suffix") .name(STORAGE_ENDPOINT_SUFFIX_PROPERTY_DESCRIPTOR_NAME)
.displayName("Common Storage Account Endpoint Suffix") .displayName("Common Storage Account Endpoint Suffix")
.description( .description(
"Storage accounts in public Azure always use a common FQDN suffix. " + "Storage accounts in public Azure always use a common FQDN suffix. " +
"Override this endpoint suffix with a different suffix in certain circumsances (like Azure Stack or non-public Azure regions). " + "Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions). " +
"The preferred way is to configure them through a controller service specified in the Storage Credentials property. " + "The preferred way is to configure them through a controller service specified in the Storage Credentials property. " +
"The controller service can provide a common/shared configuration for multiple/all Azure processors. Furthermore, the credentials " + "The controller service can provide a common/shared configuration for multiple/all Azure processors. Furthermore, the credentials " +
"can also be looked up dynamically with the 'Lookup' version of the service.") "can also be looked up dynamically with the 'Lookup' version of the service.")
@ -111,7 +116,7 @@ public final class AzureStorageUtils {
.build(); .build();
public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
.name("storage-sas-token") .name(STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME)
.displayName("SAS Token") .displayName("SAS Token")
.description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " + .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
"There are certain risks in allowing the SAS token to be stored as a flowfile " + "There are certain risks in allowing the SAS token to be stored as a flowfile " +

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
* Provides credentials details for ADLS
*
* @see AbstractControllerService
*/
@Tags({"azure", "microsoft", "cloud", "storage", "adls", "credentials"})
@CapabilityDescription("Defines credentials for ADLS processors.")
public class ADLSCredentialsControllerService extends AbstractControllerService implements ADLSCredentialsService {
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.ACCOUNT_NAME)
.description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION)
.required(true)
.build();
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX)
.displayName("Endpoint Suffix")
.description(
"Storage accounts in public Azure always use a common FQDN suffix. " +
"Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).")
.required(true)
.defaultValue("dfs.core.windows.net")
.build();
public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("storage-use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ACCOUNT_NAME,
ENDPOINT_SUFFIX,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
USE_MANAGED_IDENTITY
));
private ConfigurationContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
boolean accountKeySet = StringUtils.isNotBlank(validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue());
boolean sasTokenSet = StringUtils.isNotBlank(validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue());
boolean useManagedIdentitySet = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if (!onlyOneSet(accountKeySet, sasTokenSet, useManagedIdentitySet)) {
StringJoiner options = new StringJoiner(", ")
.add(AzureStorageUtils.ACCOUNT_KEY.getDisplayName())
.add(AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
.add(USE_MANAGED_IDENTITY.getDisplayName());
results.add(new ValidationResult.Builder().subject(this.getClass().getSimpleName())
.valid(false)
.explanation("one and only one of [" + options + "] should be set")
.build());
}
return results;
}
private boolean onlyOneSet(Boolean... checks) {
long nrOfSet = Arrays.stream(checks)
.filter(check -> check)
.count();
return nrOfSet == 1;
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
this.context = context;
}
@Override
public ADLSCredentialsDetails getCredentialsDetails(Map<String, String> attributes) {
ADLSCredentialsDetails.Builder credentialsBuilder = ADLSCredentialsDetails.Builder.newBuilder();
setValue(credentialsBuilder, ACCOUNT_NAME, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountName);
setValue(credentialsBuilder, AzureStorageUtils.ACCOUNT_KEY, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountKey);
setValue(credentialsBuilder, AzureStorageUtils.PROP_SAS_TOKEN, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setSasToken);
setValue(credentialsBuilder, ENDPOINT_SUFFIX, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setEndpointSuffix);
setValue(credentialsBuilder, USE_MANAGED_IDENTITY, PropertyValue::asBoolean, ADLSCredentialsDetails.Builder::setUseManagedIdentity);
return credentialsBuilder.build();
}
private <T> void setValue(
ADLSCredentialsDetails.Builder credentialsBuilder,
PropertyDescriptor propertyDescriptor, Function<PropertyValue, T> getPropertyValue,
BiConsumer<ADLSCredentialsDetails.Builder, T> setBuilderValue
) {
PropertyValue property = context.getProperty(propertyDescriptor);
if (property.isSet()) {
T value = getPropertyValue.apply(property);
setBuilderValue.accept(credentialsBuilder, value);
}
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService

View File

@ -23,6 +23,9 @@ import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -36,6 +39,17 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
protected String fileSystemName; protected String fileSystemName;
protected DataLakeFileSystemClient fileSystemClient; protected DataLakeFileSystemClient fileSystemClient;
@Override
protected void setUpCredentials() throws Exception {
ADLSCredentialsService service = new ADLSCredentialsControllerService();
runner.addControllerService("ADLSCredentials", service);
runner.setProperty(service, ADLSCredentialsControllerService.ACCOUNT_NAME, getAccountName());
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.enableControllerService(service);
runner.setProperty(AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE, "ADLSCredentials");
}
@Before @Before
public void setUpAzureDataLakeStorageIT() { public void setUpAzureDataLakeStorageIT() {
fileSystemName = String.format("%s-%s", FILESYSTEM_NAME_PREFIX, UUID.randomUUID()); fileSystemName = String.format("%s-%s", FILESYSTEM_NAME_PREFIX, UUID.randomUUID());

View File

@ -68,9 +68,13 @@ public abstract class AbstractAzureStorageIT {
protected TestRunner runner; protected TestRunner runner;
@Before @Before
public void setUpAzureStorageIT() { public void setUpAzureStorageIT() throws Exception {
runner = TestRunners.newTestRunner(getProcessorClass()); runner = TestRunners.newTestRunner(getProcessorClass());
setUpCredentials();
}
protected void setUpCredentials() throws Exception {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName()); runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
} }

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN; import static org.mockito.Mockito.mock;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY; import static org.mockito.Mockito.when;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Before; import org.junit.Before;
@ -34,57 +34,20 @@ public class TestAbstractAzureDataLakeStorage {
private TestRunner runner; private TestRunner runner;
@Before @Before
public void setUp() { public void setUp() throws Exception {
// test the property validation in the abstract class via the put processor // test the property validation in the abstract class via the put processor
runner = TestRunners.newTestRunner(PutAzureDataLakeStorage.class); runner = TestRunners.newTestRunner(PutAzureDataLakeStorage.class);
runner.setProperty(ACCOUNT_NAME, "accountName"); ADLSCredentialsService credentialsService = mock(ADLSCredentialsService.class);
runner.setProperty(ACCOUNT_KEY, "accountKey"); when(credentialsService.getIdentifier()).thenReturn("credentials_service");
runner.addControllerService("credentials_service", credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(FILESYSTEM, "filesystem"); runner.setProperty(FILESYSTEM, "filesystem");
runner.setProperty(DIRECTORY, "directory"); runner.setProperty(DIRECTORY, "directory");
runner.setProperty(FILE, "file"); runner.setProperty(FILE, "file");
} runner.setProperty(ADLS_CREDENTIALS_SERVICE, "credentials_service");
@Test
public void testValidWhenAccountNameAndAccountKeySpecified() {
runner.assertValid();
}
@Test
public void testValidWhenAccountNameAndSasTokenSpecified() {
runner.removeProperty(ACCOUNT_KEY);
runner.setProperty(SAS_TOKEN, "sasToken");
runner.assertValid();
}
@Test
public void testValidWhenAccountNameAndUseManagedIdentity() {
runner.removeProperty(ACCOUNT_KEY);
runner.setProperty(USE_MANAGED_IDENTITY, "true");
runner.assertValid();
}
@Test
public void testNotValidWhenNoAccountNameSpecified() {
runner.removeProperty(ACCOUNT_NAME);
runner.assertNotValid();
}
@Test
public void testNotValidWhenNoAccountKeyNorSasTokenSpecified() {
runner.removeProperty(ACCOUNT_KEY);
runner.assertNotValid();
}
@Test
public void testNotValidWhenBothAccountKeyAndSasTokenSpecified() {
runner.setProperty(SAS_TOKEN, "sasToken");
runner.assertNotValid();
} }
@Test @Test

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestADLSCredentialsControllerService {
public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service";
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = "AccountKey";
private static final String SAS_TOKEN_VALUE = "SasToken";
public static final String END_POINT_SUFFIX_VALUE = "end.point.suffix";
private TestRunner runner;
private ADLSCredentialsControllerService credentialsService;
@Before
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new ADLSCredentialsControllerService();
runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService);
}
@Test
public void testNotValidBecauseAccountNameMissing() {
configureAccountKey();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseNoCredentialsIsSet() {
configureAccountName();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseBothAccountKeyAndSasTokenSpecified() {
configureAccountName();
configureAccountKey();
configureSasToken();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseBothAccountKeyAndUseManagedIdentitySpecified() {
configureAccountName();
configureAccountKey();
configureUseManagedIdentity();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseBothSasTokenAndUseManagedIdentitySpecified() {
configureAccountName();
configureSasToken();
configureUseManagedIdentity();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseAllCredentialsSpecified() {
configureAccountName();
configureAccountKey();
configureSasToken();
configureUseManagedIdentity();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidWithEmptyEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, ADLSCredentialsControllerService.ENDPOINT_SUFFIX, "");
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidWithWhitespaceEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, ADLSCredentialsControllerService.ENDPOINT_SUFFIX, " ");
runner.assertNotValid(credentialsService);
}
@Test
public void testValidWithAccountNameAndAccountKey() {
configureAccountName();
configureAccountKey();
runner.assertValid(credentialsService);
}
@Test
public void testValidWithAccountNameAndSasToken() {
configureAccountName();
configureSasToken();
runner.assertValid(credentialsService);
}
@Test
public void testValidWithAccountNameAndUseManagedIdentity() {
configureAccountName();
configureUseManagedIdentity();
runner.assertValid(credentialsService);
}
@Test
public void testGetCredentialsDetailsWithAccountKey() throws Exception {
// GIVEN
configureAccountName();
configureAccountKey();
runner.enableControllerService(credentialsService);
// WHEN
ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>());
// THEN
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(ACCOUNT_KEY_VALUE, actual.getAccountKey());
assertNull(actual.getSasToken());
assertFalse(actual.getUseManagedIdentity());
assertNotNull(actual.getEndpointSuffix());
}
@Test
public void testGetCredentialsDetailsWithSasToken() throws Exception {
// GIVEN
configureAccountName();
configureSasToken();
runner.enableControllerService(credentialsService);
// WHEN
ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>());
// THEN
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(SAS_TOKEN_VALUE, actual.getSasToken());
assertNull(actual.getAccountKey());
assertFalse(actual.getUseManagedIdentity());
assertNotNull(actual.getEndpointSuffix());
}
@Test
public void testGetCredentialsDetailsWithUseManagedIdentity() throws Exception {
// GIVEN
configureAccountName();
configureUseManagedIdentity();
runner.enableControllerService(credentialsService);
// WHEN
ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>());
// THEN
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertTrue(actual.getUseManagedIdentity());
assertNull(actual.getAccountKey());
assertNull(actual.getSasToken());
assertNotNull(actual.getEndpointSuffix());
}
@Test
public void testGetCredentialsDetailsWithSetEndpointSuffix() throws Exception {
// GIVEN
configureAccountName();
configureAccountKey();
configureEndpointSuffix();
runner.enableControllerService(credentialsService);
// WHEN
ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>());
// THEN
assertEquals(END_POINT_SUFFIX_VALUE, actual.getEndpointSuffix());
}
private void configureAccountName() {
runner.setProperty(credentialsService, ADLSCredentialsControllerService.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
}
private void configureAccountKey() {
runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
}
private void configureSasToken() {
runner.setProperty(credentialsService, AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
}
private void configureUseManagedIdentity() {
runner.setProperty(credentialsService, ADLSCredentialsControllerService.USE_MANAGED_IDENTITY, "true");
}
private void configureEndpointSuffix() {
runner.setProperty(credentialsService, ADLSCredentialsControllerService.ENDPOINT_SUFFIX, END_POINT_SUFFIX_VALUE);
}
}

View File

@ -28,6 +28,11 @@
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId> <artifactId>azure-storage</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>${azure.core.version}</version>
</dependency>
<!-- overriding jackson-core in azure-storage --> <!-- overriding jackson-core in azure-storage -->
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
public class ADLSCredentialsDetails {
private final String accountName;
private final String accountKey;
private final String sasToken;
private final String endpointSuffix;
private final AccessToken accessToken;
private final boolean useManagedIdentity;
public ADLSCredentialsDetails(
String accountName,
String accountKey,
String sasToken,
String endpointSuffix,
AccessToken accessToken,
boolean useManagedIdentity
) {
this.accountName = accountName;
this.accountKey = accountKey;
this.sasToken = sasToken;
this.endpointSuffix = endpointSuffix;
this.accessToken = accessToken;
this.useManagedIdentity = useManagedIdentity;
}
public String getAccountName() {
return accountName;
}
public String getEndpointSuffix() {
return endpointSuffix;
}
public String getAccountKey() {
return accountKey;
}
public String getSasToken() {
return sasToken;
}
public AccessToken getAccessToken() {
return accessToken;
}
public boolean getUseManagedIdentity() {
return useManagedIdentity;
}
public static class Builder {
private String accountName;
private String accountKey;
private String sasToken;
private String endpointSuffix;
private AccessToken accessToken;
private boolean useManagedIdentity;
private Builder() {}
public static Builder newBuilder() {
return new Builder();
}
public Builder setAccountName(String accountName) {
this.accountName = accountName;
return this;
}
public Builder setAccountKey(String accountKey) {
this.accountKey = accountKey;
return this;
}
public Builder setSasToken(String sasToken) {
this.sasToken = sasToken;
return this;
}
public Builder setEndpointSuffix(String endpointSuffix) {
this.endpointSuffix = endpointSuffix;
return this;
}
public Builder setAccessToken(AccessToken accessToken) {
this.accessToken = accessToken;
return this;
}
public Builder setUseManagedIdentity(boolean useManagedIdentity) {
this.useManagedIdentity = useManagedIdentity;
return this;
}
public ADLSCredentialsDetails build() {
return new ADLSCredentialsDetails(accountName, accountKey, sasToken, endpointSuffix, accessToken, useManagedIdentity);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.ControllerService;
import java.util.Map;
/**
*
* Service interface to provide ADLS credentials details.
*/
public interface ADLSCredentialsService extends ControllerService {
/**
* Get ADLSCredentialsDetails object which contains the Storage Account Name and one or more credentials type
* @param attributes FlowFile attributes (typically)
* @return ADLSCredentialsDetails object
*/
ADLSCredentialsDetails getCredentialsDetails(Map<String, String> attributes);
}

View File

@ -27,6 +27,7 @@
<properties> <properties>
<azure-storage.version>8.4.0</azure-storage.version> <azure-storage.version>8.4.0</azure-storage.version>
<azure.core.version>1.5.0</azure.core.version>
<jackson.version>2.10.3</jackson.version> <jackson.version>2.10.3</jackson.version>
</properties> </properties>