diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index c38e3c603d..859f2be080 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import com.microsoft.azure.storage.OperationContext; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -141,16 +142,16 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { } try { - blob.upload(in, -1, null, null, operationContext); + uploadBlob(blob, operationContext, in); BlobProperties properties = blob.getProperties(); attributes.put("azure.container", containerName); attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); attributes.put("azure.etag", properties.getEtag()); attributes.put("azure.length", String.valueOf(length)); attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); - } catch (StorageException | URISyntaxException e) { + } catch (StorageException | URISyntaxException | IOException e) { storedException.set(e); - throw new IOException(e); + throw e instanceof IOException ? (IOException) e : new IOException(e); } }); @@ -175,6 +176,11 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { } + @VisibleForTesting + void uploadBlob(CloudBlob blob, OperationContext operationContext, InputStream in) throws StorageException, IOException { + blob.upload(in, -1, null, null, operationContext); + } + // Used to help force Azure Blob SDK to write in blocks private static class UnmarkableInputStream extends FilterInputStream { public UnmarkableInputStream(InputStream in) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java index e006c2ccca..142b592bec 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage; import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.util.MockFlowFile; import org.junit.Before; import org.junit.Test; @@ -58,6 +59,17 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { assertResult(); } + @Test + public void testInvalidCredentialsRoutesToFailure() { + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ="); + runner.assertValid(); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1); + } + private void assertResult() throws Exception { runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java new file mode 100644 index 0000000000..b01264a876 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; + +public class TestPutAzureBlobStorage { + + @Test + public void testIOExceptionDuringUploadTransfersToFailure() throws Exception { + PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage()); + doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any()); + + TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutAzureBlobStorage.BLOB, "test"); + runner.setProperty(AzureStorageUtils.CONTAINER, "test"); + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test"); + + runner.enqueue("test data"); + runner.run(); + + runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1); + } +}