diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index f823e6af94..f75bb7f9b0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -35,6 +35,12 @@
nifi-azure-processors
1.2.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 9049b3f106..8330bcc7b3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -1,14 +1,14 @@
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
4.0.0
@@ -31,11 +31,35 @@ language governing permissions and limitations under the License. -->
org.apache.nifi
nifi-utils
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+ provided
+
+
+ org.apache.avro
+ avro
+
com.microsoft.azure
azure-eventhubs
0.9.0
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.8.6
+
+
+
+ com.microsoft.azure
+ azure-storage
+ 5.0.0
+
org.apache.nifi
nifi-mock
@@ -57,5 +81,10 @@ language governing permissions and limitations under the License. -->
${powermock.version}
test
+
+ org.apache.nifi
+ nifi-standard-processors
+ ${project.version}
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
new file mode 100644
index 0000000000..82eae123a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
+
+ public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
+
+ public static final List properties = Collections
+ .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
new file mode 100644
index 0000000000..5ab1f8bfbf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+
+public abstract class AbstractAzureProcessor extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
+ protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
+ public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ protected CloudStorageAccount createStorageConnection(ProcessContext context) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+ return createStorageConnectionFromNameAndKey(accountName, accountKey);
+ }
+
+ protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ return createStorageConnectionFromNameAndKey(accountName, accountKey);
+ }
+
+ private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
+ final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ try {
+ return createStorageAccountFromConnectionString(storageConnectionString);
+ } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
+ *
+ * @param storageConnectionString
+ * Connection string for the storage service or the emulator
+ * @return The newly created CloudStorageAccount object
+ *
+ */
+ protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+ CloudStorageAccount storageAccount;
+ try {
+ storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ throw e;
+ } catch (InvalidKeyException e) {
+ throw e;
+ }
+ return storageAccount;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
new file mode 100644
index 0000000000..eaa234caa2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureConstants {
+ public static final String BLOCK = "Block";
+ public static final String PAGE = "Page";
+
+ public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+ public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+
+ private AzureConstants() {
+ // do not instantiate
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
new file mode 100644
index 0000000000..2229cfd097
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
+})
+public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
+ public static final List PROPERTIES = Collections
+ .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+ String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ final Map attributes = new HashMap<>();
+ final CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+ // TODO - we may be able do fancier things with ranges and
+ // distribution of download over threads, investigate
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream os) throws IOException {
+ try {
+ blob.download(os);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ });
+
+ long length = blob.getProperties().getLength();
+ attributes.put("azure.length", String.valueOf(length));
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
new file mode 100644
index 0000000000..f4a793b83d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.processors.standard.AbstractListProcessor;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageUri;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+
+@TriggerSerially
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ FetchAzureBlobStorage.class })
+@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+ @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+ @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+ @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
+ @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
+ + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
+public class ListAzureBlobStorage extends AbstractListProcessor {
+
+ private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true).required(false).build();
+
+ public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected Map createAttributes(BlobInfo entity, ProcessContext context) {
+ final Map attributes = new HashMap<>();
+ attributes.put("azure.etag", entity.getEtag());
+ attributes.put("azure.primaryUri", entity.getPrimaryUri());
+ attributes.put("azure.secondaryUri", entity.getSecondaryUri());
+ attributes.put("azure.blobname", entity.getName());
+ attributes.put("azure.blobtype", entity.getBlobType());
+ attributes.put("azure.length", String.valueOf(entity.getLength()));
+ attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
+ attributes.put("mime.type", entity.getContentType());
+ attributes.put("lang", entity.getContentLanguage());
+
+ return attributes;
+ }
+
+ @Override
+ protected String getPath(final ProcessContext context) {
+ return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(final PropertyDescriptor property) {
+ // TODO - implement
+ return false;
+ }
+
+ @Override
+ protected Scope getStateScope(final ProcessContext context) {
+ return Scope.CLUSTER;
+ }
+
+ @Override
+ protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+ String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ if (prefix == null) {
+ prefix = "";
+ }
+ final List listing = new ArrayList<>();
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ BlobRequestOptions blobRequestOptions = null;
+ OperationContext operationContext = null;
+
+ for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
+ if (blob instanceof CloudBlob) {
+ CloudBlob cloudBlob = (CloudBlob) blob;
+ BlobProperties properties = cloudBlob.getProperties();
+ StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+
+ Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType())
+ .contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength());
+
+ if (blob instanceof CloudBlockBlob) {
+ builder.blobType(AzureConstants.BLOCK);
+ } else {
+ builder.blobType(AzureConstants.PAGE);
+ }
+ listing.add(builder.build());
+ }
+ }
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
+ throw (new IOException(e));
+ }
+ return listing;
+ }
+
+ protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+ final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ try {
+ return createStorageAccountFromConnectionString(storageConnectionString);
+ } catch (InvalidKeyException | URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
+ *
+ * @param storageConnectionString
+ * Connection string for the storage service or the emulator
+ * @return The newly created CloudStorageAccount object
+ *
+ */
+ private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+
+ CloudStorageAccount storageAccount;
+ try {
+ storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ System.out.println("\nConnection string specifies an invalid URI.");
+ System.out.println("Please confirm the connection string is in the Azure connection string format.");
+ throw e;
+ } catch (InvalidKeyException e) {
+ System.out.println("\nConnection string specifies an invalid key.");
+ System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
+ throw e;
+ }
+ return storageAccount;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
new file mode 100644
index 0000000000..1327a0b2b5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
+@CapabilityDescription("Puts content into an Azure Storage Blob")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
+ @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+ @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+ @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
+ @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+ @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
+
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+
+ String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+ final Map attributes = new HashMap<>();
+ long length = flowFile.getSize();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ final InputStream in = new BufferedInputStream(rawIn);
+ try {
+ blob.upload(in, length);
+ BlobProperties properties = blob.getProperties();
+ attributes.put("azure.container", containerName);
+ attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
+ attributes.put("azure.etag", properties.getEtag());
+ attributes.put("azure.length", String.valueOf(length));
+ attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+ });
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
+ getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
new file mode 100644
index 0000000000..d429878b6d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.io.Serializable;
+
+import org.apache.nifi.processors.standard.util.ListableEntity;
+
+public class BlobInfo implements Comparable, Serializable, ListableEntity {
+ private static final long serialVersionUID = 1L;
+
+ private final String primaryUri;
+ private final String secondaryUri;
+ private final String contentType;
+ private final String contentLanguage;
+ private final String etag;
+ private final long lastModifiedTime;
+ private final long length;
+ private final String blobType;
+
+ public static long getSerialversionuid() {
+ return serialVersionUID;
+ }
+
+ public String getPrimaryUri() {
+ return primaryUri;
+ }
+
+ public String getSecondaryUri() {
+ return secondaryUri;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public String getContentLanguage() {
+ return contentLanguage;
+ }
+
+ public String getEtag() {
+ return etag;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public String getBlobType() {
+ return blobType;
+ }
+
+ public static final class Builder {
+ private String primaryUri;
+ private String secondaryUri;
+ private String contentType;
+ private String contentLanguage;
+ private String etag;
+ private long lastModifiedTime;
+ private long length;
+ private String blobType;
+
+ public Builder primaryUri(String primaryUri) {
+ this.primaryUri = primaryUri;
+ return this;
+ }
+
+ public Builder secondaryUri(String secondaryUri) {
+ this.secondaryUri = secondaryUri;
+ return this;
+ }
+
+ public Builder contentType(String contentType) {
+ this.contentType = contentType;
+ return this;
+ }
+
+ public Builder contentLanguage(String contentLanguage) {
+ this.contentLanguage = contentLanguage;
+ return this;
+ }
+
+ public Builder etag(String etag) {
+ this.etag = etag;
+ return this;
+ }
+
+ public Builder lastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ return this;
+ }
+
+ public Builder length(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public Builder blobType(String blobType) {
+ this.blobType = blobType;
+ return this;
+ }
+
+ public BlobInfo build() {
+ return new BlobInfo(this);
+ }
+
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((etag == null) ? 0 : etag.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BlobInfo other = (BlobInfo) obj;
+ if (etag == null) {
+ if (other.etag != null) {
+ return false;
+ }
+ } else if (!etag.equals(other.etag)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(BlobInfo o) {
+ return etag.compareTo(o.etag);
+ }
+
+ protected BlobInfo(final Builder builder) {
+ this.primaryUri = builder.primaryUri;
+ this.secondaryUri = builder.secondaryUri;
+ this.contentType = builder.contentType;
+ this.contentLanguage = builder.contentLanguage;
+ this.etag = builder.etag;
+ this.lastModifiedTime = builder.lastModifiedTime;
+ this.length = builder.length;
+ this.blobType = builder.blobType;
+ }
+
+ @Override
+ public String getName() {
+ String primaryUri = getPrimaryUri();
+ return primaryUri.substring(primaryUri.lastIndexOf('/') + 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getPrimaryUri();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return getLastModifiedTime();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 178e52c984..84b3300f4c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
-org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
\ No newline at end of file
+org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
+org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
+org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
+org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
new file mode 100644
index 0000000000..34702eb96f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+
+public abstract class AbstractAzureIT {
+ protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+ public static final String TEST_CONTAINER_NAME = "nifitest";
+
+ private static final Properties CONFIG;
+ protected static final String TEST_BLOB_NAME = "testing";
+ protected static final String TEST_TABLE_NAME = "testing";
+
+ static {
+ final FileInputStream fis;
+ CONFIG = new Properties();
+ try {
+ fis = new FileInputStream(CREDENTIALS_FILE);
+ try {
+ CONFIG.load(fis);
+ } catch (IOException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ } finally {
+ FileUtils.closeQuietly(fis);
+ }
+ } catch (FileNotFoundException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ }
+
+ }
+
+ @BeforeClass
+ public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException {
+ CloudBlobContainer container = getContainer();
+ container.createIfNotExists();
+ }
+
+ @AfterClass
+ public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
+ CloudBlobContainer container = getContainer();
+ for (ListBlobItem blob : container.listBlobs()) {
+ if (blob instanceof CloudBlob) {
+ ((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
+ }
+ }
+ }
+
+ public static String getAccountName() {
+ return CONFIG.getProperty("accountName");
+ }
+
+ public static String getAccountKey() {
+ return CONFIG.getProperty("accountKey");
+ }
+
+ protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
+ String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+ CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ return blobClient.getContainerReference(TEST_CONTAINER_NAME);
+ }
+
+ protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
+ String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+ CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ CloudTableClient tableClient = storageAccount.createCloudTableClient();
+ return tableClient.getTableReference(TEST_TABLE_NAME);
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
new file mode 100644
index 0000000000..1e8a8f702b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+
+public class ITFetchAzureBlobStorage extends AbstractAzureIT {
+
+ @Test
+ public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+
+ runner.setValidateExpressionUsage(true);
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+ runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
+
+ final Map attributes = new HashMap<>();
+ attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
+ attributes.put("azure.blobname", TEST_BLOB_NAME);
+ attributes.put("azure.blobtype", AzureConstants.BLOCK);
+ runner.enqueue(new byte[0], attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
+ List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
new file mode 100644
index 0000000000..277538cbce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+public class ITListAzureBlobStorage extends AbstractAzureIT {
+
+ @BeforeClass
+ public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
+ CloudBlobContainer container = getContainer();
+ container.createIfNotExists();
+
+ CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+ byte[] buf = "0123456789".getBytes();
+ InputStream in = new ByteArrayInputStream(buf);
+ blob.upload(in, 10);
+ }
+
+ @AfterClass
+ public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
+ CloudBlobContainer container = getContainer();
+ container.deleteIfExists();
+ }
+
+ @Test
+ public void testListsAzureBlobStorageContent() {
+ final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+
+ // requires multiple runs to deal with List processor checking
+ runner.run(3);
+
+ runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
+
+ for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+ entry.assertAttributeEquals("azure.length", "10");
+ entry.assertAttributeEquals("mime.type", "application/octet-stream");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
new file mode 100644
index 0000000000..0308add63e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class ITPutAzureStorageBlob extends AbstractAzureIT {
+
+ @Test
+ public void testPuttingBlob() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
+
+ runner.setValidateExpressionUsage(true);
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+ runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
+
+ runner.enqueue("0123456789".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+ List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+ }
+}