From f30c8169abe8705911b0cdac18aa90f3a217dc78 Mon Sep 17 00:00:00 2001 From: Andrew Grande Date: Tue, 4 Apr 2017 14:57:21 -0400 Subject: [PATCH] NIFI-1833 - Addressed issues from PR review. Addressed dependency issues from the review. Addressed a checkstyle issue. Review: reworded the descriptions. Review: implemented the reset condition logic. Review: dropped static qualifier from method signatures, not required really Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName() Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft. Addressing review suggestions from 4/5 Review: documentation improvements Review: documentation improvements This closes #1636. Signed-off-by: Bryan Rosander --- .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 2 +- .../nifi-azure-processors/pom.xml | 1 + .../azure/AbstractAzureBlobProcessor.java | 6 +- .../azure/AbstractAzureProcessor.java | 12 ++- .../nifi/processors/azure/AzureConstants.java | 3 + .../azure/storage/FetchAzureBlobStorage.java | 20 +++-- .../azure/storage/ListAzureBlobStorage.java | 73 ++++++++----------- .../azure/storage/PutAzureBlobStorage.java | 40 +++++----- .../azure/storage/utils/BlobInfo.java | 2 +- .../additionalDetails.html | 39 ++++++++++ .../additionalDetails.html | 39 ++++++++++ .../additionalDetails.html | 39 ++++++++++ .../azure/storage/AbstractAzureIT.java | 35 ++++----- .../storage/ITFetchAzureBlobStorage.java | 3 +- 14 files changed, 205 insertions(+), 109 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml index f75bb7f9b0..e6c3c9b0ec 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -38,7 +38,7 @@ org.apache.nifi - nifi-standard-services-api-nar + nifi-standard-nar nar diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 8330bcc7b3..9b4f28bfb3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -85,6 +85,7 @@ org.apache.nifi nifi-standard-processors ${project.version} + provided diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index 82eae123a4..2026711468 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -27,13 +27,13 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); - - public static final List properties = Collections + + private static final List PROPERTIES = Collections .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); @Override protected List getSupportedPropertyDescriptors() { - return properties; + return PROPERTIES; } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java index 5ab1f8bfbf..c95ee99832 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java @@ -32,8 +32,8 @@ import com.microsoft.azure.storage.CloudStorageAccount; public abstract class AbstractAzureProcessor extends AbstractProcessor { - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build(); - protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); protected CloudStorageAccount createStorageConnection(ProcessContext context) { @@ -49,7 +49,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { } private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) { - final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey); + final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); try { return createStorageAccountFromConnectionString(storageConnectionString); } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) { @@ -65,13 +65,11 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { * @return The newly created CloudStorageAccount object * */ - protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { + private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { CloudStorageAccount storageAccount; try { storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException e) { - throw e; - } catch (InvalidKeyException e) { + } catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) { throw e; } return storageAccount; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java index eaa234caa2..9a51030174 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java @@ -32,6 +32,9 @@ public final class AzureConstants { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + // use HTTPS by default as per MSFT recommendation + public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + private AzureConstants() { // do not instantiate } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index 2229cfd097..163a962729 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.azure.storage; import java.io.IOException; -import java.io.OutputStream; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; @@ -31,13 +30,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AzureConstants; @@ -49,12 +48,14 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") +@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class }) @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") }) public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { - public static final List PROPERTIES = Collections + + private static final List PROPERTIES = Collections .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); @Override @@ -84,14 +85,11 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { // TODO - we may be able do fancier things with ranges and // distribution of download over threads, investigate - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream os) throws IOException { - try { - blob.download(os); - } catch (StorageException e) { - throw new IOException(e); - } + flowFile = session.write(flowFile, os -> { + try { + blob.download(os); + } catch (StorageException e) { + throw new IOException(e); } }); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index f4a793b83d..f8a6c4d8c8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -46,12 +46,10 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; import org.apache.nifi.processors.standard.AbstractListProcessor; import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -60,14 +58,18 @@ import com.microsoft.azure.storage.blob.ListBlobItem; @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ FetchAzureBlobStorage.class }) +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class }) @CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage") @InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), - @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), - @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), - @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"), +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), + @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), + @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), + @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), + @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), + @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), + @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), + @WritesAttribute(attribute = "lang", description = "Language code for the content"), @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) @Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.") @@ -76,7 +78,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor { private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true).required(false).build(); - public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); @Override protected List getSupportedPropertyDescriptors() { @@ -106,8 +108,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor { @Override protected boolean isListingResetNecessary(final PropertyDescriptor property) { - // TODO - implement - return false; + // re-list if configuration changed, but not when security keys are rolled (not included in the condition) + return PREFIX.equals(property) + || AzureConstants.ACCOUNT_NAME.equals(property) + || AzureConstants.CONTAINER.equals(property); } @Override @@ -128,10 +132,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor { CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); CloudBlobContainer container = blobClient.getContainerReference(containerName); - BlobRequestOptions blobRequestOptions = null; - OperationContext operationContext = null; - - for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) { + for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { if (blob instanceof CloudBlob) { CloudBlob cloudBlob = (CloudBlob) blob; BlobProperties properties = cloudBlob.getProperties(); @@ -154,40 +155,26 @@ public class ListAzureBlobStorage extends AbstractListProcessor { return listing; } - protected static CloudStorageAccount createStorageConnection(ProcessContext context) { + protected CloudStorageAccount createStorageConnection(ProcessContext context) { final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey); + final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); try { - return createStorageAccountFromConnectionString(storageConnectionString); + + CloudStorageAccount storageAccount; + try { + storageAccount = CloudStorageAccount.parse(storageConnectionString); + } catch (IllegalArgumentException | URISyntaxException e) { + getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); + throw e; + } catch (InvalidKeyException e) { + getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); + throw e; + } + return storageAccount; } catch (InvalidKeyException | URISyntaxException e) { throw new IllegalArgumentException(e); } } - /** - * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format. - * - * @param storageConnectionString - * Connection string for the storage service or the emulator - * @return The newly created CloudStorageAccount object - * - */ - private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { - - CloudStorageAccount storageAccount; - try { - storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException e) { - System.out.println("\nConnection string specifies an invalid URI."); - System.out.println("Please confirm the connection string is in the Azure connection string format."); - throw e; - } catch (InvalidKeyException e) { - System.out.println("\nConnection string specifies an invalid key."); - System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid."); - throw e; - } - return storageAccount; - } - } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index 1327a0b2b5..e03bc258df 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -35,7 +35,6 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AzureConstants; @@ -50,13 +49,12 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class }) @CapabilityDescription("Puts content into an Azure Storage Blob") @InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), - @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), - @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) + @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")}) public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -80,21 +78,23 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { final Map attributes = new HashMap<>(); long length = flowFile.getSize(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - final InputStream in = new BufferedInputStream(rawIn); - try { - blob.upload(in, length); - BlobProperties properties = blob.getProperties(); - attributes.put("azure.container", containerName); - attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); - attributes.put("azure.etag", properties.getEtag()); - attributes.put("azure.length", String.valueOf(length)); - attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); - } catch (StorageException | URISyntaxException e) { - throw new IOException(e); - } + session.read(flowFile, rawIn -> { + InputStream in = rawIn; + if (!(in instanceof BufferedInputStream)) { + // do not double-wrap + in = new BufferedInputStream(rawIn); + } + + try { + blob.upload(in, length); + BlobProperties properties = blob.getProperties(); + attributes.put("azure.container", containerName); + attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); + attributes.put("azure.etag", properties.getEtag()); + attributes.put("azure.length", String.valueOf(length)); + attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); } }); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java index d429878b6d..6907d945f3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java @@ -159,7 +159,7 @@ public class BlobInfo implements Comparable, Serializable, ListableEnt return etag.compareTo(o.etag); } - protected BlobInfo(final Builder builder) { + private BlobInfo(final Builder builder) { this.primaryUri = builder.primaryUri; this.secondaryUri = builder.secondaryUri; this.contentType = builder.contentType; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000000..b4b8e3b55b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ + + + + + + FetchAzureBlobStorage Processor + + + + + +

Apache NiFi Azure Processors

+ +

Important Security Note

+

+ There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +

+

+ Return to a previous page +

+ + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000000..76e8775ef8 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ + + + + + + ListAzureBlobStorage Processor + + + + + +

Apache NiFi Azure Processors

+ +

Important Security Note

+

+ There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +

+

+ Return to a previous page +

+ + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000000..0a7ff3586f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ + + + + + + PutAzureBlobStorage Processor + + + + + +

Apache NiFi Azure Processors

+ +

Important Security Note

+

+ There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +

+

+ Return to a previous page +

+ + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java index 34702eb96f..91a8c73826 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java @@ -16,7 +16,17 @@ */ package org.apache.nifi.processors.azure.storage; -import static org.junit.Assert.fail; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -25,19 +35,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.Properties; -import org.apache.nifi.util.file.FileUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; -import com.microsoft.azure.storage.blob.ListBlobItem; -import com.microsoft.azure.storage.table.CloudTable; -import com.microsoft.azure.storage.table.CloudTableClient; +import static org.junit.Assert.fail; public abstract class AbstractAzureIT { protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; @@ -90,17 +88,10 @@ public abstract class AbstractAzureIT { } protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); + String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); return blobClient.getContainerReference(TEST_CONTAINER_NAME); } - protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); - CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); - CloudTableClient tableClient = storageAccount.createCloudTableClient(); - return tableClient.getTableReference(TEST_TABLE_NAME); - } - } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index 1e8a8f702b..7dc8830ec9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.processors.azure.AbstractAzureProcessor; import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -51,7 +52,7 @@ public class ITFetchAzureBlobStorage extends AbstractAzureIT { runner.enqueue(new byte[0], attributes); runner.run(); - runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); for (MockFlowFile flowFile : flowFilesForRelationship) { flowFile.assertContentEquals("0123456789".getBytes());