From 41649660be11431ffb02e4e1618c6559b1eeac28 Mon Sep 17 00:00:00 2001 From: Malthe Borch Date: Tue, 29 Nov 2022 19:04:13 +0100 Subject: [PATCH] NIFI-10721 Avoid querying properties after Azure Blob upload This closes #6730 Signed-off-by: David Handermann --- .../azure/AbstractAzureBlobProcessor_v12.java | 28 ++++++++++++++----- .../storage/PutAzureBlobStorage_v12.java | 18 +++++++++++- .../storage/ITPutAzureBlobStorage_v12.java | 19 ++++++++++++- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java index be2197a01a..c1dd3dfdad 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; @@ -175,12 +176,25 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor { } protected void applyBlobMetadata(Map attributes, BlobClient blobClient) { - BlobProperties properties = blobClient.getProperties(); - attributes.put(ATTR_NAME_ETAG, properties.getETag()); - attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString()); - attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType()); - attributes.put(ATTR_NAME_LANG, properties.getContentLanguage()); - attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified())); - attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize())); + Supplier props = new Supplier() { + BlobProperties properties; + public BlobProperties get() { + if (properties == null) { + properties = blobClient.getProperties(); + } + return properties; + } + }; + + attributes.computeIfAbsent(ATTR_NAME_ETAG, key -> props.get().getETag()); + attributes.computeIfAbsent(ATTR_NAME_BLOBTYPE, key -> props.get().getBlobType().toString()); + attributes.computeIfAbsent(ATTR_NAME_MIME_TYPE, key -> props.get().getContentType()); + attributes.computeIfAbsent(ATTR_NAME_TIMESTAMP, key -> String.valueOf(props.get().getLastModified())); + attributes.computeIfAbsent(ATTR_NAME_LENGTH, key -> String.valueOf(props.get().getBlobSize())); + + // The LANG attribute is a special case because we allow it to be null. + if (!attributes.containsKey(ATTR_NAME_LANG)) { + attributes.put(ATTR_NAME_LANG, props.get().getContentLanguage()); + } } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index 8390438f84..3b7896e5c3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.http.rest.Response; import com.azure.core.util.Context; import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; @@ -23,6 +24,8 @@ import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlobType; +import com.azure.storage.blob.models.BlockBlobItem; import com.azure.storage.blob.options.BlobParallelUploadOptions; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -50,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM; import static com.azure.core.util.FluxUtil.toFluxByteBuffer; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; @@ -161,7 +165,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { try (InputStream rawIn = session.read(flowFile)) { final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn)); blobParallelUploadOptions.setRequestConditions(blobRequestConditions); - blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); + Response response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); + BlockBlobItem blob = response.getValue(); + long length = flowFile.getSize(); + applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length); applyBlobMetadata(attributes, blobClient); if (ignore) { attributes.put(ATTR_NAME_IGNORED, "false"); @@ -191,4 +198,13 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { session.transfer(flowFile, REL_FAILURE); } } + + private static void applyUploadResultAttributes(final Map attributes, final BlockBlobItem blob, final BlobType blobType, final long length) { + attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString()); + attributes.put(ATTR_NAME_ETAG, blob.getETag()); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(length)); + attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified())); + attributes.put(ATTR_NAME_LANG, null); + attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java index a648296e02..e3481d194c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java @@ -40,9 +40,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { + public static class ITProcessor extends PutAzureBlobStorage_v12 { + public boolean blobMetadataApplied = false; + @Override + protected void applyBlobMetadata(Map attributes, BlobClient blobClient) { + super.applyBlobMetadata(attributes, blobClient); + blobMetadataApplied = true; + } + } + @Override protected Class getProcessorClass() { - return PutAzureBlobStorage_v12.class; + return ITProcessor.class; } @BeforeEach @@ -57,6 +66,14 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA); } + @Test + public void testPutBlobApplyBlobMetadata() throws Exception { + runProcessor(BLOB_DATA); + + assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA); + assertTrue(((ITProcessor) runner.getProcessor()).blobMetadataApplied); + } + @Test public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception { configureProxyService();