From 3488a169caa06edaf527edc1273acb36682a1d77 Mon Sep 17 00:00:00 2001 From: Simon Elliston Ball Date: Mon, 2 May 2016 00:35:34 +0100 Subject: [PATCH] NIFI-1833 - Azure Storage processors Signed-off-by: Bryan Rosander --- .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 6 + .../nifi-azure-processors/pom.xml | 47 ++++- .../azure/AbstractAzureBlobProcessor.java | 39 ++++ .../azure/AbstractAzureProcessor.java | 85 ++++++++ .../nifi/processors/azure/AzureConstants.java | 38 ++++ .../azure/storage/FetchAzureBlobStorage.java | 114 +++++++++++ .../azure/storage/ListAzureBlobStorage.java | 193 ++++++++++++++++++ .../azure/storage/PutAzureBlobStorage.java | 116 +++++++++++ .../azure/storage/utils/BlobInfo.java | 188 +++++++++++++++++ .../org.apache.nifi.processor.Processor | 5 +- .../azure/storage/AbstractAzureIT.java | 106 ++++++++++ .../storage/ITFetchAzureBlobStorage.java | 61 ++++++ .../azure/storage/ITListAzureBlobStorage.java | 75 +++++++ .../azure/storage/ITPutAzureStorageBlob.java | 51 +++++ 14 files changed, 1114 insertions(+), 10 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml index f823e6af94..f75bb7f9b0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -35,6 +35,12 @@ nifi-azure-processors 1.2.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + nar + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 9049b3f106..8330bcc7b3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -1,14 +1,14 @@ + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> 4.0.0 @@ -31,11 +31,35 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-utils + + org.apache.nifi + nifi-ssl-context-service-api + provided + + + org.apache.avro + avro + com.microsoft.azure azure-eventhubs 0.9.0 + + com.fasterxml.jackson.core + jackson-core + 2.8.6 + + + + com.microsoft.azure + azure-storage + 5.0.0 + org.apache.nifi nifi-mock @@ -57,5 +81,10 @@ language governing permissions and limitations under the License. --> ${powermock.version} test + + org.apache.nifi + nifi-standard-processors + ${project.version} + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java new file mode 100644 index 0000000000..82eae123a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor { + + public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); + + public static final List properties = Collections + .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java new file mode 100644 index 0000000000..5ab1f8bfbf --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; + +import com.microsoft.azure.storage.CloudStorageAccount; + +public abstract class AbstractAzureProcessor extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build(); + protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build(); + public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + protected CloudStorageAccount createStorageConnection(ProcessContext context) { + final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + return createStorageConnectionFromNameAndKey(accountName, accountKey); + } + + protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) { + final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue(); + return createStorageConnectionFromNameAndKey(accountName, accountKey); + } + + private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) { + final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey); + try { + return createStorageAccountFromConnectionString(storageConnectionString); + } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format. + * + * @param storageConnectionString + * Connection string for the storage service or the emulator + * @return The newly created CloudStorageAccount object + * + */ + protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { + CloudStorageAccount storageAccount; + try { + storageAccount = CloudStorageAccount.parse(storageConnectionString); + } catch (IllegalArgumentException | URISyntaxException e) { + throw e; + } catch (InvalidKeyException e) { + throw e; + } + return storageAccount; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java new file mode 100644 index 0000000000..eaa234caa2 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; + +public final class AzureConstants { + public static final String BLOCK = "Block"; + public static final String PAGE = "Page"; + + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build(); + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + + private AzureConstants() { + // do not instantiate + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java new file mode 100644 index 0000000000..2229cfd097 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; +import org.apache.nifi.processors.azure.AzureConstants; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; + +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) +@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") +@InputRequirement(Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") +}) +public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { + public static final List PROPERTIES = Collections + .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobContainer container = blobClient.getContainerReference(containerName); + + final Map attributes = new HashMap<>(); + final CloudBlob blob = container.getBlockBlobReference(blobPath); + + // TODO - we may be able do fancier things with ranges and + // distribution of download over threads, investigate + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream os) throws IOException { + try { + blob.download(os); + } catch (StorageException e) { + throw new IOException(e); + } + } + }); + + long length = blob.getProperties().getLength(); + attributes.put("azure.length", String.valueOf(length)); + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); + + } catch (IllegalArgumentException | URISyntaxException | StorageException e1) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java new file mode 100644 index 0000000000..f4a793b83d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.processors.azure.storage.utils.BlobInfo; +import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; +import org.apache.nifi.processors.standard.AbstractListProcessor; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.StorageUri; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; + +@TriggerSerially +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) +@SeeAlso({ FetchAzureBlobStorage.class }) +@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), + @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), + @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), + @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"), + @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) +@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " + + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.") +public class ListAzureBlobStorage extends AbstractListProcessor { + + private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true).required(false).build(); + + public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map createAttributes(BlobInfo entity, ProcessContext context) { + final Map attributes = new HashMap<>(); + attributes.put("azure.etag", entity.getEtag()); + attributes.put("azure.primaryUri", entity.getPrimaryUri()); + attributes.put("azure.secondaryUri", entity.getSecondaryUri()); + attributes.put("azure.blobname", entity.getName()); + attributes.put("azure.blobtype", entity.getBlobType()); + attributes.put("azure.length", String.valueOf(entity.getLength())); + attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp())); + attributes.put("mime.type", entity.getContentType()); + attributes.put("lang", entity.getContentLanguage()); + + return attributes; + } + + @Override + protected String getPath(final ProcessContext context) { + return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); + } + + @Override + protected boolean isListingResetNecessary(final PropertyDescriptor property) { + // TODO - implement + return false; + } + + @Override + protected Scope getStateScope(final ProcessContext context) { + return Scope.CLUSTER; + } + + @Override + protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); + String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); + if (prefix == null) { + prefix = ""; + } + final List listing = new ArrayList<>(); + try { + CloudStorageAccount storageAccount = createStorageConnection(context); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobContainer container = blobClient.getContainerReference(containerName); + + BlobRequestOptions blobRequestOptions = null; + OperationContext operationContext = null; + + for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) { + if (blob instanceof CloudBlob) { + CloudBlob cloudBlob = (CloudBlob) blob; + BlobProperties properties = cloudBlob.getProperties(); + StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); + + Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType()) + .contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength()); + + if (blob instanceof CloudBlockBlob) { + builder.blobType(AzureConstants.BLOCK); + } else { + builder.blobType(AzureConstants.PAGE); + } + listing.add(builder.build()); + } + } + } catch (IllegalArgumentException | URISyntaxException | StorageException e) { + throw (new IOException(e)); + } + return listing; + } + + protected static CloudStorageAccount createStorageConnection(ProcessContext context) { + final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey); + try { + return createStorageAccountFromConnectionString(storageConnectionString); + } catch (InvalidKeyException | URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format. + * + * @param storageConnectionString + * Connection string for the storage service or the emulator + * @return The newly created CloudStorageAccount object + * + */ + private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { + + CloudStorageAccount storageAccount; + try { + storageAccount = CloudStorageAccount.parse(storageConnectionString); + } catch (IllegalArgumentException | URISyntaxException e) { + System.out.println("\nConnection string specifies an invalid URI."); + System.out.println("Please confirm the connection string is in the Azure connection string format."); + throw e; + } catch (InvalidKeyException e) { + System.out.println("\nConnection string specifies an invalid key."); + System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid."); + throw e; + } + return storageAccount; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java new file mode 100644 index 0000000000..1327a0b2b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; +import org.apache.nifi.processors.azure.AzureConstants; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; + +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) +@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class }) +@CapabilityDescription("Puts content into an Azure Storage Blob") +@InputRequirement(Requirement.INPUT_REQUIRED) +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), + @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), + @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), + @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), + @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), + @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) +public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { + + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + + String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobContainer container = blobClient.getContainerReference(containerName); + + CloudBlob blob = container.getBlockBlobReference(blobPath); + + final Map attributes = new HashMap<>(); + long length = flowFile.getSize(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + final InputStream in = new BufferedInputStream(rawIn); + try { + blob.upload(in, length); + BlobProperties properties = blob.getProperties(); + attributes.put("azure.container", containerName); + attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); + attributes.put("azure.etag", properties.getEtag()); + attributes.put("azure.length", String.valueOf(length)); + attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + }); + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + session.transfer(flowFile, REL_SUCCESS); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); + + } catch (IllegalArgumentException | URISyntaxException | StorageException e) { + getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java new file mode 100644 index 0000000000..d429878b6d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import java.io.Serializable; + +import org.apache.nifi.processors.standard.util.ListableEntity; + +public class BlobInfo implements Comparable, Serializable, ListableEntity { + private static final long serialVersionUID = 1L; + + private final String primaryUri; + private final String secondaryUri; + private final String contentType; + private final String contentLanguage; + private final String etag; + private final long lastModifiedTime; + private final long length; + private final String blobType; + + public static long getSerialversionuid() { + return serialVersionUID; + } + + public String getPrimaryUri() { + return primaryUri; + } + + public String getSecondaryUri() { + return secondaryUri; + } + + public String getContentType() { + return contentType; + } + + public String getContentLanguage() { + return contentLanguage; + } + + public String getEtag() { + return etag; + } + + public long getLastModifiedTime() { + return lastModifiedTime; + } + + public long getLength() { + return length; + } + + public String getBlobType() { + return blobType; + } + + public static final class Builder { + private String primaryUri; + private String secondaryUri; + private String contentType; + private String contentLanguage; + private String etag; + private long lastModifiedTime; + private long length; + private String blobType; + + public Builder primaryUri(String primaryUri) { + this.primaryUri = primaryUri; + return this; + } + + public Builder secondaryUri(String secondaryUri) { + this.secondaryUri = secondaryUri; + return this; + } + + public Builder contentType(String contentType) { + this.contentType = contentType; + return this; + } + + public Builder contentLanguage(String contentLanguage) { + this.contentLanguage = contentLanguage; + return this; + } + + public Builder etag(String etag) { + this.etag = etag; + return this; + } + + public Builder lastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + return this; + } + + public Builder length(long length) { + this.length = length; + return this; + } + + public Builder blobType(String blobType) { + this.blobType = blobType; + return this; + } + + public BlobInfo build() { + return new BlobInfo(this); + } + + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((etag == null) ? 0 : etag.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BlobInfo other = (BlobInfo) obj; + if (etag == null) { + if (other.etag != null) { + return false; + } + } else if (!etag.equals(other.etag)) { + return false; + } + return true; + } + + @Override + public int compareTo(BlobInfo o) { + return etag.compareTo(o.etag); + } + + protected BlobInfo(final Builder builder) { + this.primaryUri = builder.primaryUri; + this.secondaryUri = builder.secondaryUri; + this.contentType = builder.contentType; + this.contentLanguage = builder.contentLanguage; + this.etag = builder.etag; + this.lastModifiedTime = builder.lastModifiedTime; + this.length = builder.length; + this.blobType = builder.blobType; + } + + @Override + public String getName() { + String primaryUri = getPrimaryUri(); + return primaryUri.substring(primaryUri.lastIndexOf('/') + 1); + } + + @Override + public String getIdentifier() { + return getPrimaryUri(); + } + + @Override + public long getTimestamp() { + return getLastModifiedTime(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 178e52c984..84b3300f4c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.azure.eventhub.PutAzureEventHub -org.apache.nifi.processors.azure.eventhub.GetAzureEventHub \ No newline at end of file +org.apache.nifi.processors.azure.eventhub.GetAzureEventHub +org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage +org.apache.nifi.processors.azure.storage.ListAzureBlobStorage +org.apache.nifi.processors.azure.storage.PutAzureBlobStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java new file mode 100644 index 0000000000..34702eb96f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import static org.junit.Assert.fail; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; +import com.microsoft.azure.storage.blob.ListBlobItem; +import com.microsoft.azure.storage.table.CloudTable; +import com.microsoft.azure.storage.table.CloudTableClient; + +public abstract class AbstractAzureIT { + protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; + public static final String TEST_CONTAINER_NAME = "nifitest"; + + private static final Properties CONFIG; + protected static final String TEST_BLOB_NAME = "testing"; + protected static final String TEST_TABLE_NAME = "testing"; + + static { + final FileInputStream fis; + CONFIG = new Properties(); + try { + fis = new FileInputStream(CREDENTIALS_FILE); + try { + CONFIG.load(fis); + } catch (IOException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } finally { + FileUtils.closeQuietly(fis); + } + } catch (FileNotFoundException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } + + } + + @BeforeClass + public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException { + CloudBlobContainer container = getContainer(); + container.createIfNotExists(); + } + + @AfterClass + public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException { + CloudBlobContainer container = getContainer(); + for (ListBlobItem blob : container.listBlobs()) { + if (blob instanceof CloudBlob) { + ((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null); + } + } + } + + public static String getAccountName() { + return CONFIG.getProperty("accountName"); + } + + public static String getAccountKey() { + return CONFIG.getProperty("accountKey"); + } + + protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException { + String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); + CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + return blobClient.getContainerReference(TEST_CONTAINER_NAME); + } + + protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException { + String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); + CloudTableClient tableClient = storageAccount.createCloudTableClient(); + return tableClient.getTableReference(TEST_TABLE_NAME); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java new file mode 100644 index 0000000000..1e8a8f702b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import com.microsoft.azure.storage.StorageException; + +public class ITFetchAzureBlobStorage extends AbstractAzureIT { + + @Test + public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException { + final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage()); + + runner.setValidateExpressionUsage(true); + + runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); + runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); + + final Map attributes = new HashMap<>(); + attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME); + attributes.put("azure.blobname", TEST_BLOB_NAME); + attributes.put("azure.blobtype", AzureConstants.BLOCK); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java new file mode 100644 index 0000000000..277538cbce --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; + +public class ITListAzureBlobStorage extends AbstractAzureIT { + + @BeforeClass + public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException { + CloudBlobContainer container = getContainer(); + container.createIfNotExists(); + + CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME); + byte[] buf = "0123456789".getBytes(); + InputStream in = new ByteArrayInputStream(buf); + blob.upload(in, 10); + } + + @AfterClass + public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException { + CloudBlobContainer container = getContainer(); + container.deleteIfExists(); + } + + @Test + public void testListsAzureBlobStorageContent() { + final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage()); + + runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); + + // requires multiple runs to deal with List processor checking + runner.run(3); + + runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1); + + for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) { + entry.assertAttributeEquals("azure.length", "10"); + entry.assertAttributeEquals("mime.type", "application/octet-stream"); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java new file mode 100644 index 0000000000..0308add63e --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.IOException; +import java.util.List; + +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class ITPutAzureStorageBlob extends AbstractAzureIT { + + @Test + public void testPuttingBlob() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage()); + + runner.setValidateExpressionUsage(true); + + runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName()); + runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey()); + runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME); + runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); + + runner.enqueue("0123456789".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } +}