mirror of https://github.com/apache/nifi.git
NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so that Azure can emit FlushWithClose event
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9451.
This commit is contained in:
parent
b6952f1246
commit
eb8d4ee06f
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import com.azure.core.util.Context;
|
||||
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
|
||||
import com.azure.storage.file.datalake.DataLakeFileClient;
|
||||
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
|
||||
import com.azure.storage.file.datalake.DataLakeServiceClient;
|
||||
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
|
||||
import com.azure.storage.file.datalake.models.DataLakeStorageException;
|
||||
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -225,21 +227,18 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
|
|||
|
||||
private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound,
|
||||
final long transferSize, final DataLakeFileClient fileClient) throws Exception {
|
||||
if (transferSize > 0) {
|
||||
try (final InputStream inputStream = new BufferedInputStream(
|
||||
fileResourceFound.map(FileResource::getInputStream)
|
||||
.orElseGet(() -> session.read(flowFile)))
|
||||
) {
|
||||
uploadContent(fileClient, inputStream, transferSize);
|
||||
} catch (final Exception e) {
|
||||
removeFile(fileClient);
|
||||
throw e;
|
||||
}
|
||||
try (final InputStream inputStream = new BufferedInputStream(
|
||||
fileResourceFound.map(FileResource::getInputStream)
|
||||
.orElseGet(() -> session.read(flowFile)))
|
||||
) {
|
||||
uploadContent(fileClient, inputStream, transferSize);
|
||||
} catch (final Exception e) {
|
||||
removeFile(fileClient);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
//Visible for testing
|
||||
static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException {
|
||||
private static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException {
|
||||
long chunkStart = 0;
|
||||
long chunkSize;
|
||||
|
||||
|
@ -258,8 +257,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
|
|||
chunkStart += chunkSize;
|
||||
}
|
||||
|
||||
// use overwrite mode due to https://github.com/Azure/azure-sdk-for-java/issues/31248
|
||||
fileClient.flush(length, true);
|
||||
fileClient.flushWithResponse(length, new DataLakeFileFlushOptions().setClose(true), null, Context.NONE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -272,7 +270,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
|
|||
* @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore'
|
||||
* @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors
|
||||
*/
|
||||
DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) {
|
||||
private DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) {
|
||||
final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName);
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue