From eb8d4ee06f92f5ac9671040277177204e4f72be0 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi <turcsanyi@apache.org> Date: Sat, 26 Oct 2024 21:51:37 +0200 Subject: [PATCH] NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so that Azure can emit FlushWithClose event Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9451. --- .../storage/PutAzureDataLakeStorage.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 5552fa3af1..8475e4bcec 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.util.Context; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.DataLakeRequestConditions; import com.azure.storage.file.datalake.models.DataLakeStorageException; +import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions; import org.apache.commons.io.input.BoundedInputStream; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -225,21 +227,18 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound, final long transferSize, final DataLakeFileClient fileClient) throws Exception { - if (transferSize > 0) { - try (final InputStream inputStream = new BufferedInputStream( - fileResourceFound.map(FileResource::getInputStream) - .orElseGet(() -> session.read(flowFile))) - ) { - uploadContent(fileClient, inputStream, transferSize); - } catch (final Exception e) { - removeFile(fileClient); - throw e; - } + try (final InputStream inputStream = new BufferedInputStream( + fileResourceFound.map(FileResource::getInputStream) + .orElseGet(() -> session.read(flowFile))) + ) { + uploadContent(fileClient, inputStream, transferSize); + } catch (final Exception e) { + removeFile(fileClient); + throw e; } } - //Visible for testing - static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException { + private static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException { long chunkStart = 0; long chunkSize; @@ -258,8 +257,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess chunkStart += chunkSize; } - // use overwrite mode due to https://github.com/Azure/azure-sdk-for-java/issues/31248 - fileClient.flush(length, true); + fileClient.flushWithResponse(length, new DataLakeFileFlushOptions().setClose(true), null, Context.NONE); } /** @@ -272,7 +270,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + private DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); try {