mirror of https://github.com/apache/nifi.git
NIFI-12271 Fix PutAzureBlobStorage_v12 rollback on failure with FileResourceService
This closes #7930 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
c4ff8de412
commit
a3e4f89fe3
|
@ -166,10 +166,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl
|
||||||
final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue());
|
final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue());
|
||||||
final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue());
|
final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue());
|
||||||
final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
|
|
||||||
|
|
||||||
long startNanos = System.nanoTime();
|
long startNanos = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
|
final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
|
||||||
BlobServiceClient storageClient = getStorageClient(context, flowFile);
|
BlobServiceClient storageClient = getStorageClient(context, flowFile);
|
||||||
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
|
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
|
||||||
if (createContainer && !containerClient.exists()) {
|
if (createContainer && !containerClient.exists()) {
|
||||||
|
|
|
@ -284,6 +284,31 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
|
||||||
assertProvenanceEvents();
|
assertProvenanceEvents();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutBlobFromNonExistentLocalFile() throws Exception {
|
||||||
|
String attributeName = "file.path";
|
||||||
|
|
||||||
|
String serviceId = FileResourceService.class.getSimpleName();
|
||||||
|
FileResourceService service = new StandardFileResourceService();
|
||||||
|
runner.addControllerService(serviceId, service);
|
||||||
|
runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.setProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
|
||||||
|
runner.setProperty(ResourceTransferProperties.FILE_RESOURCE_SERVICE, serviceId);
|
||||||
|
|
||||||
|
String filePath = "nonexistent.txt";
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(attributeName, filePath);
|
||||||
|
|
||||||
|
runProcessor(EMPTY_CONTENT, attributes);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_FAILURE, 1);
|
||||||
|
|
||||||
|
assertProvenanceEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void runProcessor(byte[] data) {
|
private void runProcessor(byte[] data) {
|
||||||
runProcessor(data, Collections.emptyMap());
|
runProcessor(data, Collections.emptyMap());
|
||||||
|
|
Loading…
Reference in New Issue