mirror of https://github.com/apache/nifi.git
NIFI-10721 Avoid querying properties after Azure Blob upload
This closes #6730 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
96a6594680
commit
41649660be
|
@ -46,6 +46,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
|
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
|
||||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
|
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
|
||||||
|
@ -175,12 +176,25 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
|
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
|
||||||
BlobProperties properties = blobClient.getProperties();
|
Supplier<BlobProperties> props = new Supplier() {
|
||||||
attributes.put(ATTR_NAME_ETAG, properties.getETag());
|
BlobProperties properties;
|
||||||
attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
|
public BlobProperties get() {
|
||||||
attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
|
if (properties == null) {
|
||||||
attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
|
properties = blobClient.getProperties();
|
||||||
attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
|
}
|
||||||
attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
|
return properties;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
attributes.computeIfAbsent(ATTR_NAME_ETAG, key -> props.get().getETag());
|
||||||
|
attributes.computeIfAbsent(ATTR_NAME_BLOBTYPE, key -> props.get().getBlobType().toString());
|
||||||
|
attributes.computeIfAbsent(ATTR_NAME_MIME_TYPE, key -> props.get().getContentType());
|
||||||
|
attributes.computeIfAbsent(ATTR_NAME_TIMESTAMP, key -> String.valueOf(props.get().getLastModified()));
|
||||||
|
attributes.computeIfAbsent(ATTR_NAME_LENGTH, key -> String.valueOf(props.get().getBlobSize()));
|
||||||
|
|
||||||
|
// The LANG attribute is a special case because we allow it to be null.
|
||||||
|
if (!attributes.containsKey(ATTR_NAME_LANG)) {
|
||||||
|
attributes.put(ATTR_NAME_LANG, props.get().getContentLanguage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.azure.storage;
|
package org.apache.nifi.processors.azure.storage;
|
||||||
|
|
||||||
|
import com.azure.core.http.rest.Response;
|
||||||
import com.azure.core.util.Context;
|
import com.azure.core.util.Context;
|
||||||
import com.azure.storage.blob.BlobClient;
|
import com.azure.storage.blob.BlobClient;
|
||||||
import com.azure.storage.blob.BlobContainerClient;
|
import com.azure.storage.blob.BlobContainerClient;
|
||||||
|
@ -23,6 +24,8 @@ import com.azure.storage.blob.BlobServiceClient;
|
||||||
import com.azure.storage.blob.models.BlobErrorCode;
|
import com.azure.storage.blob.models.BlobErrorCode;
|
||||||
import com.azure.storage.blob.models.BlobRequestConditions;
|
import com.azure.storage.blob.models.BlobRequestConditions;
|
||||||
import com.azure.storage.blob.models.BlobStorageException;
|
import com.azure.storage.blob.models.BlobStorageException;
|
||||||
|
import com.azure.storage.blob.models.BlobType;
|
||||||
|
import com.azure.storage.blob.models.BlockBlobItem;
|
||||||
import com.azure.storage.blob.options.BlobParallelUploadOptions;
|
import com.azure.storage.blob.options.BlobParallelUploadOptions;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
@ -50,6 +53,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
|
||||||
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
|
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
|
||||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
|
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
|
||||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
|
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
|
||||||
|
@ -161,7 +165,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
|
||||||
try (InputStream rawIn = session.read(flowFile)) {
|
try (InputStream rawIn = session.read(flowFile)) {
|
||||||
final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
|
final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
|
||||||
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
|
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
|
||||||
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
|
Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
|
||||||
|
BlockBlobItem blob = response.getValue();
|
||||||
|
long length = flowFile.getSize();
|
||||||
|
applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
|
||||||
applyBlobMetadata(attributes, blobClient);
|
applyBlobMetadata(attributes, blobClient);
|
||||||
if (ignore) {
|
if (ignore) {
|
||||||
attributes.put(ATTR_NAME_IGNORED, "false");
|
attributes.put(ATTR_NAME_IGNORED, "false");
|
||||||
|
@ -191,4 +198,13 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void applyUploadResultAttributes(final Map<String, String> attributes, final BlockBlobItem blob, final BlobType blobType, final long length) {
|
||||||
|
attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString());
|
||||||
|
attributes.put(ATTR_NAME_ETAG, blob.getETag());
|
||||||
|
attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
|
||||||
|
attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
|
||||||
|
attributes.put(ATTR_NAME_LANG, null);
|
||||||
|
attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,9 +40,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
|
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
|
||||||
|
public static class ITProcessor extends PutAzureBlobStorage_v12 {
|
||||||
|
public boolean blobMetadataApplied = false;
|
||||||
|
@Override
|
||||||
|
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
|
||||||
|
super.applyBlobMetadata(attributes, blobClient);
|
||||||
|
blobMetadataApplied = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Class<? extends Processor> getProcessorClass() {
|
protected Class<? extends Processor> getProcessorClass() {
|
||||||
return PutAzureBlobStorage_v12.class;
|
return ITProcessor.class;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
@ -57,6 +66,14 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
|
||||||
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
|
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutBlobApplyBlobMetadata() throws Exception {
|
||||||
|
runProcessor(BLOB_DATA);
|
||||||
|
|
||||||
|
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
|
||||||
|
assertTrue(((ITProcessor) runner.getProcessor()).blobMetadataApplied);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
|
public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
|
||||||
configureProxyService();
|
configureProxyService();
|
||||||
|
|
Loading…
Reference in New Issue