diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 9fcd5a1d77..284bb031e2 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -212,25 +212,38 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess fileClient.flush(length, true); } - //Visible for testing + /** + * This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. + * Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in + * case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. + * + * Visible for testing + * + * @param sourceFileClient client of the temporary file + * @param destinationDirectory final location of the uploaded file + * @param destinationFileName final name of the uploaded file + * @param conflictResolution conflict resolution strategy + * @return URL of the uploaded file + */ String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { + final String destinationPath = createPath(destinationDirectory, destinationFileName); + try { final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions(); if (!conflictResolution.equals(REPLACE_RESOLUTION)) { destinationCondition.setIfNoneMatch("*"); } - final String destinationPath = createPath(destinationDirectory, destinationFileName); return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); } catch (DataLakeStorageException dataLakeStorageException) { removeTempFile(sourceFileClient); if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { - getLogger().info("File with the same name [{}] already exists. Remote file not modified due to {} being set to '{}'.", - sourceFileClient.getFileName(), CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); + getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", + destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); return null; } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { - throw new ProcessException(String.format("File with the same name [%s] already exists.", sourceFileClient.getFileName()), dataLakeStorageException); + throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); } else { - throw new ProcessException(String.format("Renaming File [%s] failed", sourceFileClient.getFileName()), dataLakeStorageException); + throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); } } }