NIFI-10884 log target filename

This closes #6792.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-12-16 20:00:24 +01:00 committed by Peter Turcsanyi
parent 1d5a1bff08
commit caee606706
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
1 changed files with 19 additions and 6 deletions

View File

@ -212,25 +212,38 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
fileClient.flush(length, true); fileClient.flush(length, true);
} }
//Visible for testing /**
* This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it.
* Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in
* case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important.
*
* Visible for testing
*
* @param sourceFileClient client of the temporary file
* @param destinationDirectory final location of the uploaded file
* @param destinationFileName final name of the uploaded file
* @param conflictResolution conflict resolution strategy
* @return URL of the uploaded file
*/
String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) {
final String destinationPath = createPath(destinationDirectory, destinationFileName);
try { try {
final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions(); final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions();
if (!conflictResolution.equals(REPLACE_RESOLUTION)) { if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
destinationCondition.setIfNoneMatch("*"); destinationCondition.setIfNoneMatch("*");
} }
final String destinationPath = createPath(destinationDirectory, destinationFileName);
return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl();
} catch (DataLakeStorageException dataLakeStorageException) { } catch (DataLakeStorageException dataLakeStorageException) {
removeTempFile(sourceFileClient); removeTempFile(sourceFileClient);
if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
getLogger().info("File with the same name [{}] already exists. Remote file not modified due to {} being set to '{}'.", getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.",
sourceFileClient.getFileName(), CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
return null; return null;
} else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) {
throw new ProcessException(String.format("File with the same name [%s] already exists.", sourceFileClient.getFileName()), dataLakeStorageException); throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException);
} else { } else {
throw new ProcessException(String.format("Renaming File [%s] failed", sourceFileClient.getFileName()), dataLakeStorageException); throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException);
} }
} }
} }