diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index d5f1ec57ef..e43bfe05ac 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -36,7 +36,7 @@ com.microsoft.azure azure-storage - 5.0.0 + 5.2.0 com.fasterxml.jackson.core diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index f0729e6771..9bb3d33511 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -17,23 +17,52 @@ package org.apache.nifi.processors.azure; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.Azure; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; -public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor { +public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The filename of the blob") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); private static final List PROPERTIES = Collections - .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); + .unmodifiableList(Arrays.asList( + Azure.CONTAINER, + Azure.PROP_SAS_TOKEN, + Azure.ACCOUNT_NAME, + Azure.ACCOUNT_KEY, + BLOB)); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList( + AbstractAzureBlobProcessor.REL_SUCCESS, + AbstractAzureBlobProcessor.REL_FAILURE))); @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + return Azure.validateCredentialProperties(validationContext); + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java deleted file mode 100644 index 5812236f40..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure; - -import java.net.URISyntaxException; -import java.security.InvalidKeyException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.Relationship; - -import com.microsoft.azure.storage.CloudStorageAccount; - -public abstract class AbstractAzureProcessor extends AbstractProcessor { - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); - - protected CloudStorageAccount createStorageConnection(ProcessContext context) { - final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); - final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - return createStorageConnectionFromNameAndKey(accountName, accountKey); - } - - protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) { - final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue(); - return createStorageConnectionFromNameAndKey(accountName, accountKey); - } - - private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) { - final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); - try { - return createStorageAccountFromConnectionString(storageConnectionString); - } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format. - * - * @param storageConnectionString - * Connection string for the storage service or the emulator - * @return The newly created CloudStorageAccount object - * - */ - private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { - CloudStorageAccount storageAccount; - storageAccount = CloudStorageAccount.parse(storageConnectionString); - return storageAccount; - } - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java deleted file mode 100644 index 1e0cde31a9..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.util.StandardValidators; - -public final class AzureConstants { - public static final String BLOCK = "Block"; - public static final String PAGE = "Page"; - - public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key") - .description("The storage account key. There are certain risks in allowing the account key to be stored as a flowfile" + - "attribute. While it does provide for a more flexible flow by allowing the account key to " + - "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); - - public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name") - .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" + - "attribute. While it does provide for a more flexible flow by allowing the account name to " + - "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); - - public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container name") - .description("Name of the azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); - - // use HTTPS by default as per MSFT recommendation - public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; - - private AzureConstants() { - // do not instantiate - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index cd08ec1a55..f484f1a660 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -18,10 +18,7 @@ package org.apache.nifi.processors.azure.storage; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -34,15 +31,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; -import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; @@ -57,14 +52,6 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; }) public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { - private static final List PROPERTIES = Collections - .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -74,13 +61,12 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { final long startNanos = System.nanoTime(); - String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); AtomicReference storedException = new AtomicReference<>(); try { - CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); final Map attributes = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index 148f72424b..18669a1d77 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -16,17 +16,15 @@ */ package org.apache.nifi.processors.azure.storage; -import java.io.IOException; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.microsoft.azure.storage.StorageUri; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -37,24 +35,25 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; import org.apache.nifi.processors.azure.storage.utils.BlobInfo; import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.StorageUri; -import com.microsoft.azure.storage.blob.BlobListingDetails; -import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @@ -79,16 +78,27 @@ import com.microsoft.azure.storage.blob.ListBlobItem; "where the previous node left off, without duplicating the data.") public class ListAzureBlobStorage extends AbstractListProcessor { - private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing") + private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + Azure.CONTAINER, + Azure.PROP_SAS_TOKEN, + Azure.ACCOUNT_NAME, + Azure.ACCOUNT_KEY, + PROP_PREFIX)); @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + return Azure.validateCredentialProperties(validationContext); + } + @Override protected Map createAttributes(BlobInfo entity, ProcessContext context) { final Map attributes = new HashMap<>(); @@ -107,15 +117,16 @@ public class ListAzureBlobStorage extends AbstractListProcessor { @Override protected String getPath(final ProcessContext context) { - return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); + return context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue(); } @Override protected boolean isListingResetNecessary(final PropertyDescriptor property) { // re-list if configuration changed, but not when security keys are rolled (not included in the condition) - return PREFIX.equals(property) - || AzureConstants.ACCOUNT_NAME.equals(property) - || AzureConstants.CONTAINER.equals(property); + return PROP_PREFIX.equals(property) + || Azure.ACCOUNT_NAME.equals(property) + || Azure.CONTAINER.equals(property) + || Azure.PROP_SAS_TOKEN.equals(property); } @Override @@ -125,15 +136,14 @@ public class ListAzureBlobStorage extends AbstractListProcessor { @Override protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { - String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); - String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); + String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue(); + String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue(); if (prefix == null) { prefix = ""; } final List listing = new ArrayList<>(); try { - CloudStorageAccount storageAccount = createStorageConnection(context); - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { @@ -142,43 +152,32 @@ public class ListAzureBlobStorage extends AbstractListProcessor { BlobProperties properties = cloudBlob.getProperties(); StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); - Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType()) - .contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength()); + Builder builder = new BlobInfo.Builder() + .primaryUri(uri.getPrimaryUri().toString()) + .contentType(properties.getContentType()) + .contentLanguage(properties.getContentLanguage()) + .etag(properties.getEtag()) + .lastModifiedTime(properties.getLastModified().getTime()) + .length(properties.getLength()); + + if (uri.getSecondaryUri() != null) { + builder.secondaryUri(uri.getSecondaryUri().toString()); + } if (blob instanceof CloudBlockBlob) { - builder.blobType(AzureConstants.BLOCK); + builder.blobType(Azure.BLOCK); } else { - builder.blobType(AzureConstants.PAGE); + builder.blobType(Azure.PAGE); } listing.add(builder.build()); } } - } catch (IllegalArgumentException | URISyntaxException | StorageException e) { - throw (new IOException(e)); + } catch (Throwable t) { + throw new IOException(ExceptionUtils.getRootCause(t)); } return listing; } - private CloudStorageAccount createStorageConnection(ProcessContext context) { - final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); - final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); - try { - CloudStorageAccount storageAccount; - try { - storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException e) { - getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); - throw e; - } catch (InvalidKeyException e) { - getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); - throw e; - } - return storageAccount; - } catch (InvalidKeyException | URISyntaxException e) { - throw new IllegalArgumentException(e); - } - } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index e1646355ad..65f8f2f478 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -38,9 +38,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; -import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.CloudBlob; @@ -67,14 +66,13 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { final long startNanos = System.nanoTime(); - String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); AtomicReference storedException = new AtomicReference<>(); try { - CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger()); CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlob blob = container.getBlockBlobReference(blobPath); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java new file mode 100644 index 0000000000..4a734ab573 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public final class Azure { + public static final String BLOCK = "Block"; + public static final String PAGE = "Page"; + + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key") + .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + + "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + + "There are certain risks in allowing the account key to be stored as a flowfile" + + "attribute. While it does provide for a more flexible flow by allowing the account key to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name") + .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container Name") + .description("Name of the Azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + + public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder() + .name("SAS String") + .description("Shared Access Signature string, including the leading '?'. Specify either SAS (recommended) or Account Key") + .required(false) + .expressionLanguageSupported(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + // use HTTPS by default as per MSFT recommendation + public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net"; + + private Azure() { + // do not instantiate + } + + public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger) { + final String accountName = context.getProperty(Azure.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + final String accountKey = context.getProperty(Azure.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + final String sasToken = context.getProperty(Azure.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + + CloudBlobClient cloudBlobClient; + + try { + // sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work) + if (StringUtils.isNotBlank(sasToken)) { + String storageConnectionString = String.format(Azure.FORMAT_BASE_URI, accountName); + StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken); + cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds); + } else { + String blobConnString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString); + cloudBlobClient = storageAccount.createCloudBlobClient(); + } + } catch (IllegalArgumentException | URISyntaxException e) { + logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } catch (InvalidKeyException e) { + logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } + + return cloudBlobClient; + } + + public static Collection validateCredentialProperties(ValidationContext validationContext) { + final List results = new ArrayList<>(); + + String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue(); + String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue(); + if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName)) + || (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) { + results.add(new ValidationResult.Builder().subject("Azure Credentials") + .valid(false) + .explanation("either Azure Account Key or Shared Access Signature required, but not both") + .build()); + } + + return results; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java index 636845c3f1..b665eff80a 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java @@ -25,7 +25,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.Properties; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; import org.apache.nifi.util.file.FileUtils; import com.microsoft.azure.storage.CloudStorageAccount; @@ -67,7 +67,7 @@ class AzureTestUtil { } static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); + String storageConnectionString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); return blobClient.getContainerReference(containerName); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index eeedd3b0ed..4da7106f74 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.nifi.processors.azure.AbstractAzureProcessor; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; +import org.apache.nifi.processors.azure.storage.utils.Azure; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -55,19 +55,19 @@ public class ITFetchAzureBlobStorage { try { runner.setValidateExpressionUsage(true); - runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, containerName); + runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(Azure.CONTAINER, containerName); runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); final Map attributes = new HashMap<>(); attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME); attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME); - attributes.put("azure.blobtype", AzureConstants.BLOCK); + attributes.put("azure.blobtype", Azure.BLOCK); runner.enqueue(new byte[0], attributes); runner.run(); - runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1); List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); for (MockFlowFile flowFile : flowFilesForRelationship) { flowFile.assertContentEquals("0123456789".getBytes()); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java index 6dd9088f37..1bc788ac4e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java @@ -23,7 +23,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.UUID; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -49,9 +49,9 @@ public class ITListAzureBlobStorage { final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage()); try { - runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, containerName); + runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(Azure.CONTAINER, containerName); // requires multiple runs to deal with List processor checking runner.run(3); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java index 5cecdbcbcb..43046db046 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java @@ -22,7 +22,7 @@ import java.security.InvalidKeyException; import java.util.List; import java.util.UUID; -import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.Azure; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -44,9 +44,9 @@ public class ITPutAzureStorageBlob { try { runner.setValidateExpressionUsage(true); - runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); - runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); - runner.setProperty(AzureConstants.CONTAINER, containerName); + runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(Azure.CONTAINER, containerName); runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); runner.enqueue("0123456789".getBytes());