diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 0a3602db60..2cff8eed68 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -56,6 +56,16 @@ 1.12.0-SNAPSHOT provided + + com.azure + azure-core + 1.5.0 + + + com.azure + azure-identity + 1.0.6 + com.microsoft.azure azure-eventhubs @@ -75,12 +85,6 @@ azure-storage-file-datalake 12.1.1 - - - com.fasterxml.jackson.core - jackson-core - 2.10.3 - org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 40d276c815..af75f99720 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -16,30 +16,32 @@ */ package org.apache.nifi.processors.azure; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.commons.lang3.StringUtils; - -import com.azure.storage.common.StorageSharedKeyCredential; -import com.azure.storage.file.datalake.DataLakeServiceClient; -import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; - -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.Map; +import java.util.Set; + +import com.azure.identity.ManagedIdentityCredential; +import com.azure.identity.ManagedIdentityCredentialBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { @@ -85,6 +87,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder() + .name("use-managed-identity") + .displayName("Use Azure Managed Identity") + .description("Choose whether or not to use the managed identity of Azure VM/VMSS ") + .required(false).defaultValue("false").allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() .name("filesystem-name").displayName("Filesystem Name") .description("Name of the Azure Storage File System. It is assumed to be already existing.") @@ -110,6 +119,15 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .defaultValue("${azure.filename}") .build(); + public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder() + .name("endpoint-suffix").displayName("Endpoint Suffix") + .description("Endpoint Suffix") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .defaultValue("dfs.core.windows.net") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( "Files that have been successfully written to Azure storage are transferred to this relationship") .build(); @@ -118,9 +136,14 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .build(); private static final List PROPERTIES = Collections.unmodifiableList( - Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, - AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM, - AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE)); + Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, + AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, + AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, + AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY, + AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX, + AbstractAzureDataLakeStorageProcessor.FILESYSTEM, + AbstractAzureDataLakeStorageProcessor.DIRECTORY, + AbstractAzureDataLakeStorageProcessor.FILE)); private static final Set RELATIONSHIPS = Collections.unmodifiableSet( new HashSet<>(Arrays.asList( @@ -134,17 +157,32 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static Collection validateCredentialProperties(final ValidationContext validationContext) { final List results = new ArrayList<>(); - final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue(); - final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue(); - final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue(); - if (StringUtils.isNotBlank(accountName) - && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) { - results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false) - .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() + - " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() + - " must be specified, not both") - .build()); + final boolean useManagedIdentity = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + final boolean accountKeyIsSet = validationContext.getProperty(ACCOUNT_KEY).isSet(); + final boolean sasTokenIsSet = validationContext.getProperty(SAS_TOKEN).isSet(); + + int credential_config_found = 0; + if(useManagedIdentity) credential_config_found++; + if(accountKeyIsSet) credential_config_found++; + if(sasTokenIsSet) credential_config_found++; + + if(credential_config_found == 0){ + final String msg = String.format( + "At least one of ['%s', '%s', '%s'] should be set", + ACCOUNT_KEY.getDisplayName(), + SAS_TOKEN.getDisplayName(), + USE_MANAGED_IDENTITY.getDisplayName() + ); + results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); + } else if(credential_config_found > 1) { + final String msg = String.format( + "Only one of ['%s', '%s', '%s'] should be set", + ACCOUNT_KEY.getDisplayName(), + SAS_TOKEN.getDisplayName(), + USE_MANAGED_IDENTITY.getDisplayName() + ); + results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } return results; } @@ -154,7 +192,9 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue(); final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue(); - final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName); + final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue(); + final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix); + final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); DataLakeServiceClient storageClient; if (StringUtils.isNotBlank(accountKey)) { final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, @@ -164,6 +204,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc } else if (StringUtils.isNotBlank(sasToken)) { storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) .buildClient(); + } else if(useManagedIdentity){ + final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder() + .build(); + storageClient = new DataLakeServiceClientBuilder() + .endpoint(endpoint) + .credential(misCrendential) + .buildClient(); } else { throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName())); @@ -181,4 +228,4 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public Set getRelationships() { return RELATIONSHIPS; } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java index 4ac2f07e39..1cfe1a7054 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java @@ -16,12 +16,13 @@ */ package org.apache.nifi.services.azure.storage; +import java.util.Map; + import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup; -import java.util.Map; @Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" }) @CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java index 59800bb03d..960b1fea4f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java @@ -16,17 +16,18 @@ */ package org.apache.nifi.processors.azure.storage; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; public class TestAbstractAzureDataLakeStorage { @@ -57,6 +58,14 @@ public class TestAbstractAzureDataLakeStorage { runner.assertValid(); } + @Test + public void testValidWhenAccountNameAndUseManagedIdentity() { + runner.removeProperty(ACCOUNT_KEY); + runner.setProperty(USE_MANAGED_IDENTITY, "true"); + + runner.assertValid(); + } + @Test public void testNotValidWhenNoAccountNameSpecified() { runner.removeProperty(ACCOUNT_NAME); diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index d4983e1548..c5ecc49a56 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -27,6 +27,7 @@ 8.4.0 + 2.10.3 @@ -50,6 +51,47 @@ + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${jackson.version} + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + ${jackson.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + org.apache.commons + commons-lang3 + 3.9 + + + org.apache.commons + commons-text + 1.8 +