NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12

This closes #6443

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Malthe Borch 2022-09-24 08:43:10 +02:00 committed by exceptionfactory
parent 491f21bf90
commit a71556f115
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
10 changed files with 195 additions and 35 deletions

View File

@ -69,7 +69,7 @@ public class MockFlowFile implements FlowFileRecord {
public MockFlowFile(final long id, final FlowFile toCopy) {
this.creationTime = System.nanoTime();
this.id = id;
entryDate = System.currentTimeMillis();
entryDate = toCopy.getEntryDate();
final Map<String, String> attributesToCopy = toCopy.getAttributes();
String filename = attributesToCopy.get(CoreAttributes.FILENAME.key());

View File

@ -161,21 +161,26 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
}
protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {
Map<String, String> attributes = new HashMap<>();
BlobProperties properties = blobClient.getProperties();
String primaryUri = String.format("%s/%s", blobClient.getContainerClient().getBlobContainerUrl(), blobClient.getBlobName());
final Map<String, String> attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient);
applyBlobMetadata(attributes, blobClient);
return attributes;
}
protected void applyStandardBlobAttributes(Map<String, String> attributes, BlobClient blobClient) {
String primaryUri = blobClient.getBlobUrl().replace("%2F", "/");
attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
attributes.put(ATTR_NAME_PRIMARY_URI, primaryUri);
}
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
BlobProperties properties = blobClient.getProperties();
attributes.put(ATTR_NAME_ETAG, properties.getETag());
attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
return attributes;
}
}

View File

@ -16,9 +16,14 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -35,19 +40,23 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LANG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_MIME_TYPE;
@ -56,7 +65,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LANG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_MIME_TYPE;
@ -75,7 +86,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
@WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
@WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
@ -91,10 +104,21 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
"will fail if the container does not exist.")
.build();
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("conflict-resolution-strategy")
.displayName("Conflict Resolution Strategy")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.allowableValues(AzureStorageConflictResolutionStrategy.class)
.defaultValue(AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION.getValue())
.description("Specifies whether an existing blob will have its contents replaced upon conflict.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
CREATE_CONTAINER,
CONFLICT_RESOLUTION,
BLOB_NAME,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@ -110,9 +134,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
return;
}
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue());
long startNanos = System.nanoTime();
try {
@ -121,18 +146,40 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
BlobClient blobClient = containerClient.getBlobClient(blobName);
final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
Map<String, String> attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient);
final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
long length = flowFile.getSize();
try {
if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
blobRequestConditions.setIfNoneMatch("*");
}
try (InputStream rawIn = session.read(flowFile);
BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
blobClient.upload(bufferedIn, length);
try (InputStream rawIn = session.read(flowFile)) {
final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
applyBlobMetadata(attributes, blobClient);
if (ignore) {
attributes.put(ATTR_NAME_IGNORED, "false");
}
}
} catch (BlobStorageException e) {
final BlobErrorCode errorCode = e.getErrorCode();
flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
attributes.put(ATTR_NAME_IGNORED, "true");
} else {
throw e;
}
}
Map<String, String> attributes = createBlobAttributesMap(blobClient);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);

View File

@ -45,4 +45,10 @@ public final class BlobAttributes {
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
public static final String ATTR_NAME_IGNORED = "azure.ignored";
public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict Resolution Strategy is 'ignore', " +
"this property will be true/false depending on whether the blob was ignored.";
}

View File

@ -34,6 +34,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -137,10 +139,18 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
return attributes;
}
protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
blobName,
StandardCharsets.US_ASCII.name()
).replace("+", "%20").replace("%2F", "/"))
);
}
protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, int blobLength) {
flowFile.assertAttributeExists(BlobAttributes.ATTR_NAME_ETAG);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_MIME_TYPE, "application/octet-stream");

View File

@ -191,7 +191,8 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0);
assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, originalLength);
assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
assertFlowFileResultBlobAttributes(flowFile, originalLength);
flowFile.assertContentEquals(blobData);
}

View File

@ -174,7 +174,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage_v12.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS).get(0);
assertFlowFileBlobAttributes(flowFile, getContainerName(), "blob5", "Test".length());
assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), "blob5");
assertFlowFileResultBlobAttributes(flowFile, "Test".length());
}
private void uploadBlobs() throws Exception {
@ -207,8 +208,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
}
private void assertFlowFile(MockFlowFile flowFile, String blobName) throws Exception {
assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, BLOB_DATA.length);
assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
assertFlowFileResultBlobAttributes(flowFile, BLOB_DATA.length);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), blobName.substring(blobName.lastIndexOf('/') + 1));
flowFile.assertContentEquals(EMPTY_CONTENT);

View File

@ -18,10 +18,12 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobErrorCode;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -32,11 +34,12 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureBlobStorage_v12.class;
@ -101,7 +104,7 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runProcessor(BLOB_DATA);
assertFailure(BLOB_DATA);
assertFailure(BLOB_DATA, BlobErrorCode.CONTAINER_NOT_FOUND);
}
@Test
@ -136,7 +139,29 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runProcessor(BLOB_DATA);
assertFailure(BLOB_DATA);
MockFlowFile flowFile = assertFailure(BLOB_DATA, BlobErrorCode.BLOB_ALREADY_EXISTS);
assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
}
@Test
public void testPutBlobToExistingBlobConflictStrategyIgnore() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA);
runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
runProcessor(BLOB_DATA);
MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), "true");
}
@Test
public void testPutBlobToExistingBlobConflictStrategyReplace() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA);
runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION.getValue());
runProcessor(BLOB_DATA);
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
@ -158,20 +183,30 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runner.run();
}
private void assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
assertFlowFile(containerName, blobName, blobData);
private MockFlowFile assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
MockFlowFile flowFile = assertFlowFile(containerName, blobName, blobData);
assertAzureBlob(containerName, blobName, blobData);
assertProvenanceEvents();
return flowFile;
}
private void assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception {
MockFlowFile flowFile = assertFlowFile(containerName, blobName, null);
assertProvenanceEvents();
return flowFile;
}
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
private MockFlowFile assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_SUCCESS, 1);
assertFlowFileBlobAttributes(flowFile, containerName, blobName, blobData.length);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureBlobStorage_v12.REL_SUCCESS).get(0);
flowFile.assertContentEquals(blobData);
assertFlowFileCommonBlobAttributes(flowFile, containerName, blobName);
if (blobData != null) {
assertFlowFileResultBlobAttributes(flowFile, blobData.length);
flowFile.assertContentEquals(blobData);
}
return flowFile;
}
private void assertAzureBlob(String containerName, String blobName, byte[] blobData) {
@ -191,10 +226,12 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertEquals(expectedEventTypes, actualEventTypes);
}
private void assertFailure(byte[] blobData) throws Exception {
private MockFlowFile assertFailure(byte[] blobData, BlobErrorCode errorCode) throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureBlobStorage_v12.REL_FAILURE).get(0);
flowFile.assertContentEquals(blobData);
flowFile.assertAttributeEquals(ATTR_NAME_ERROR_CODE, errorCode.toString());
return flowFile;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.components.DescribedValue;
public enum AzureStorageConflictResolutionStrategy implements DescribedValue {
FAIL_RESOLUTION("fail", "Fail if the blob already exists"),
IGNORE_RESOLUTION("ignore",
String.format(
"Ignore if the blob already exists; the 'azure.error' attribute will be set to the value 'BLOB_ALREADY_EXISTS'"
)
),
REPLACE_RESOLUTION("replace", "Replace blob contents if the blob already exist");
private final String label;
private final String description;
AzureStorageConflictResolutionStrategy(String label, String description) {
this.label = label;
this.description = description;
}
@Override
public String getValue() {
return this.name();
}
@Override
public String getDisplayName() {
return label;
}
@Override
public String getDescription() {
return description;
}
}