From 82e36f3c71220821ded66e6795d9aaf3b0ccd41d Mon Sep 17 00:00:00 2001 From: zenfenaan Date: Fri, 12 Jan 2018 22:45:13 +0530 Subject: [PATCH] NIFI-4819: Added DeleteAzureBlobStorage that handles deletion of blob from an Azure Storage container Signed-off-by: Pierre Villard This closes #2436. --- .../nifi-azure-processors/pom.xml | 13 ++++ .../azure/storage/DeleteAzureBlobStorage.java | 72 +++++++++++++++++++ .../azure/storage/FetchAzureBlobStorage.java | 2 +- .../azure/storage/ListAzureBlobStorage.java | 2 +- .../azure/storage/PutAzureBlobStorage.java | 2 +- .../org.apache.nifi.processor.Processor | 3 +- .../azure/AbstractAzureBlobStorageIT.java | 53 ++++++++++++++ .../azure/storage/AzureTestUtil.java | 8 +-- .../storage/ITDeleteAzureBlobStorage.java | 62 ++++++++++++++++ .../src/test/resources/hello.txt | 1 + 10 files changed, 210 insertions(+), 8 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 3149f7e236..980ec2c5a5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -80,4 +80,17 @@ test + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/hello.txt + + + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java new file mode 100644 index 0000000000..2819f99ab2 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + + +@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) +@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, PutAzureBlobStorage.class}) +@CapabilityDescription("Deletes the provided blob from Azure Storage") +@InputRequirement(Requirement.INPUT_REQUIRED) +public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if(flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); + String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); + CloudBlobContainer container = blobClient.getContainerReference(containerName); + CloudBlob blob = container.getBlockBlobReference(blobPath); + blob.deleteIfExists(); + session.transfer(flowFile, REL_SUCCESS); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); + }catch ( StorageException | URISyntaxException e) { + getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index 89d22f3f2a..b9bbf44bf4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -45,7 +45,7 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") -@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class }) +@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index cdbb59a7b0..1cba6dd47b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -57,7 +57,7 @@ import java.util.Map; @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class }) +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) @CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage. " + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + "previous node left off without duplicating all of the data.") diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index c41733bd64..abcb2ee6cb 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -47,7 +47,7 @@ import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class }) +@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class }) @CapabilityDescription("Puts content into an Azure Storage Blob") @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 073d6745ed..fa34294243 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -17,4 +17,5 @@ org.apache.nifi.processors.azure.eventhub.GetAzureEventHub org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage org.apache.nifi.processors.azure.storage.ListAzureBlobStorage -org.apache.nifi.processors.azure.storage.PutAzureBlobStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.PutAzureBlobStorage +org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java new file mode 100644 index 0000000000..aebace62ba --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/AbstractAzureBlobStorageIT.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.nifi.processors.azure.storage.AzureTestUtil; +import org.junit.Assert; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AbstractAzureBlobStorageIT { + + protected final static String SAMPLE_FILE_NAME = "/hello.txt"; + protected final static String SAMPLE_BLOB_NAME = "testing"; + + protected void uploadBlob(String containerName, String filePath) throws URISyntaxException, StorageException, InvalidKeyException, IOException { + CloudBlobContainer container = AzureTestUtil.getContainer(containerName); + CloudBlob blob = container.getBlockBlobReference(SAMPLE_BLOB_NAME); + blob.uploadFromFile(filePath); + } + + protected String getFileFromResource(String fileName) { + URI uri = null; + try { + uri = this.getClass().getResource(fileName).toURI(); + } catch (URISyntaxException e) { + Assert.fail("Cannot proceed without File : " + fileName); + } + + return uri.toString(); + } + +} + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java index 10bf59dfea..4396c709a1 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java @@ -33,7 +33,7 @@ import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; -class AzureTestUtil { +public class AzureTestUtil { private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; static final String TEST_CONTAINER_NAME_PREFIX = "nifitest"; @@ -58,15 +58,15 @@ class AzureTestUtil { } - static String getAccountName() { + public static String getAccountName() { return CONFIG.getProperty("accountName"); } - static String getAccountKey() { + public static String getAccountKey() { return CONFIG.getProperty("accountKey"); } - static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { + public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java new file mode 100644 index 0000000000..4af6030e10 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.nifi.processors.azure.AbstractAzureBlobStorageIT; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.UUID; + +public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT{ + + @Test + public void testDeleteBlob() throws StorageException, URISyntaxException, InvalidKeyException, IOException { + String containerName = String.format("%s-%s", AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); + CloudBlobContainer container = AzureTestUtil.getContainer(containerName); + container.createIfNotExists(); + + uploadBlob(containerName, getFileFromResource(SAMPLE_FILE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(DeleteAzureBlobStorage.class); + + try { + runner.setValidateExpressionUsage(true); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(AzureStorageUtils.CONTAINER, containerName); + runner.setProperty(DeleteAzureBlobStorage.BLOB, AzureTestUtil.TEST_BLOB_NAME); + + runner.enqueue(new byte[0]); + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS); + + } finally { + container.deleteIfExists(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt new file mode 100644 index 0000000000..ee13cb732d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/resources/hello.txt @@ -0,0 +1 @@ +Hello, World!! \ No newline at end of file