From a3e4f89fe36643e573988f6853b7590c82ae19ae Mon Sep 17 00:00:00 2001 From: lehelb Date: Wed, 25 Oct 2023 12:43:13 +0200 Subject: [PATCH] NIFI-12271 Fix PutAzureBlobStorage_v12 rollback on failure with FileResourceService This closes #7930 Signed-off-by: David Handermann --- .../storage/PutAzureBlobStorage_v12.java | 2 +- .../storage/ITPutAzureBlobStorage_v12.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index f005800873..d303b0c175 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -166,10 +166,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue()); final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); - final Optional fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); long startNanos = System.nanoTime(); try { + final Optional fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); BlobServiceClient storageClient = getStorageClient(context, flowFile); BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); if (createContainer && !containerClient.exists()) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java index 713d2f4c8e..0f101c71c1 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java @@ -284,6 +284,31 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { assertProvenanceEvents(); } + @Test + public void testPutBlobFromNonExistentLocalFile() throws Exception { + String attributeName = "file.path"; + + String serviceId = FileResourceService.class.getSimpleName(); + FileResourceService service = new StandardFileResourceService(); + runner.addControllerService(serviceId, service); + runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName)); + runner.enableControllerService(service); + + runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue()); + runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, serviceId); + + String filePath = "nonexistent.txt"; + + Map attributes = new HashMap<>(); + attributes.put(attributeName, filePath); + + runProcessor(EMPTY_CONTENT, attributes); + + runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_FAILURE, 1); + + assertProvenanceEvents(); + } + private void runProcessor(byte[] data) { runProcessor(data, Collections.emptyMap());