NIFI-7409: Azure managed identity support to Azure Datalake processors

NIFI-7409: review changes
NIFI-7409: ordering import statements
NIFI-7409: changed validateCredentialProperties logic

This closes #4249.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
sjyang18 2020-05-01 18:55:50 +00:00 committed by Peter Turcsanyi
parent 3fec4d8c27
commit 852715aadd
5 changed files with 148 additions and 45 deletions

View File

@ -56,6 +56,16 @@
<version>1.12.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
@ -75,12 +85,6 @@
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.1.1</version>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -16,30 +16,32 @@
*/
package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.commons.lang3.StringUtils;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Set;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@ -85,6 +87,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
@ -110,6 +119,15 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.defaultValue("${azure.filename}")
.build();
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.name("endpoint-suffix").displayName("Endpoint Suffix")
.description("Endpoint Suffix")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.defaultValue("dfs.core.windows.net")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
"Files that have been successfully written to Azure storage are transferred to this relationship")
.build();
@ -118,9 +136,14 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE));
Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
AbstractAzureDataLakeStorageProcessor.SAS_TOKEN,
AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY,
AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX,
AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
AbstractAzureDataLakeStorageProcessor.DIRECTORY,
AbstractAzureDataLakeStorageProcessor.FILE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
@ -134,17 +157,32 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static Collection<ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue();
if (StringUtils.isNotBlank(accountName)
&& ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) {
results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false)
.explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() +
" or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() +
" must be specified, not both")
.build());
final boolean useManagedIdentity = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();
final boolean accountKeyIsSet = validationContext.getProperty(ACCOUNT_KEY).isSet();
final boolean sasTokenIsSet = validationContext.getProperty(SAS_TOKEN).isSet();
int credential_config_found = 0;
if(useManagedIdentity) credential_config_found++;
if(accountKeyIsSet) credential_config_found++;
if(sasTokenIsSet) credential_config_found++;
if(credential_config_found == 0){
final String msg = String.format(
"At least one of ['%s', '%s', '%s'] should be set",
ACCOUNT_KEY.getDisplayName(),
SAS_TOKEN.getDisplayName(),
USE_MANAGED_IDENTITY.getDisplayName()
);
results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
} else if(credential_config_found > 1) {
final String msg = String.format(
"Only one of ['%s', '%s', '%s'] should be set",
ACCOUNT_KEY.getDisplayName(),
SAS_TOKEN.getDisplayName(),
USE_MANAGED_IDENTITY.getDisplayName()
);
results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
}
return results;
}
@ -154,7 +192,9 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName);
final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue();
final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
@ -164,6 +204,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
} else if (StringUtils.isNotBlank(sasToken)) {
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient();
} else if(useManagedIdentity){
final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder()
.build();
storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(misCrendential)
.buildClient();
} else {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName()));
@ -181,4 +228,4 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
}
}

View File

@ -16,12 +16,13 @@
*/
package org.apache.nifi.services.azure.storage;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
import java.util.Map;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " +

View File

@ -16,17 +16,18 @@
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestAbstractAzureDataLakeStorage {
@ -57,6 +58,14 @@ public class TestAbstractAzureDataLakeStorage {
runner.assertValid();
}
@Test
public void testValidWhenAccountNameAndUseManagedIdentity() {
runner.removeProperty(ACCOUNT_KEY);
runner.setProperty(USE_MANAGED_IDENTITY, "true");
runner.assertValid();
}
@Test
public void testNotValidWhenNoAccountNameSpecified() {
runner.removeProperty(ACCOUNT_NAME);

View File

@ -27,6 +27,7 @@
<properties>
<azure-storage.version>8.4.0</azure-storage.version>
<jackson.version>2.10.3</jackson.version>
</properties>
<modules>
@ -50,6 +51,47 @@
</exclusion>
</exclusions>
</dependency>
<!-- dependency convergency resolution -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.8</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>