NIFI-12928 Added Simple Write strategy in PutAzureDataLakeStorage

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8540.
This commit is contained in:
Peter Turcsanyi 2024-03-21 08:47:28 +01:00 committed by Pierre Villard
parent 8eb013a813
commit bffc342521
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 298 additions and 100 deletions

View File

@ -40,6 +40,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator;
import org.apache.nifi.processors.azure.storage.utils.WritingStrategy;
import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.StringUtils;
@ -99,6 +100,15 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
.build(); .build();
protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder()
.name("writing-strategy")
.displayName("Writing Strategy")
.description("Defines the approach for writing the Azure file.")
.required(true)
.allowableValues(WritingStrategy.class)
.defaultValue(WritingStrategy.WRITE_AND_RENAME)
.build();
public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder()
.name("base-temporary-path") .name("base-temporary-path")
.displayName("Base Temporary Path") .displayName("Base Temporary Path")
@ -108,6 +118,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
.defaultValue("") .defaultValue("")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new DirectoryValidator("Base Temporary Path")) .addValidator(new DirectoryValidator("Base Temporary Path"))
.dependsOn(WRITING_STRATEGY, WritingStrategy.WRITE_AND_RENAME)
.build(); .build();
private static final List<PropertyDescriptor> PROPERTIES = List.of( private static final List<PropertyDescriptor> PROPERTIES = List.of(
@ -115,6 +126,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
FILESYSTEM, FILESYSTEM,
DIRECTORY, DIRECTORY,
FILE, FILE,
WRITING_STRATEGY,
BASE_TEMPORARY_PATH, BASE_TEMPORARY_PATH,
CONFLICT_RESOLUTION, CONFLICT_RESOLUTION,
RESOURCE_TRANSFER_SOURCE, RESOURCE_TRANSFER_SOURCE,
@ -137,41 +149,44 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
try { try {
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile); final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile);
final String originalDirectory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile);
final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile);
final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY);
final String fileName = evaluateFileProperty(context, flowFile); final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem); final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final String tempFilePrefix = UUID.randomUUID().toString(); final WritingStrategy writingStrategy = context.getProperty(WRITING_STRATEGY).asAllowableValue(WritingStrategy.class);
final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory);
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue());
final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize()); final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize());
final DataLakeFileClient fileClient;
if (writingStrategy == WritingStrategy.WRITE_AND_RENAME) {
final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile);
final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY);
final String tempFilePrefix = UUID.randomUUID().toString();
final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory);
final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
if (transferSize > 0) {
final FlowFile sourceFlowFile = flowFile; uploadFile(session, flowFile, fileResourceFound, transferSize, tempFileClient);
try (
final InputStream inputStream = new BufferedInputStream(
fileResourceFound.map(FileResource::getInputStream)
.orElseGet(() -> session.read(sourceFlowFile))
)
) {
uploadContent(tempFileClient, inputStream, transferSize);
} catch (final Exception e) {
removeTempFile(tempFileClient);
throw e;
}
}
createDirectoryIfNotExists(directoryClient); createDirectoryIfNotExists(directoryClient);
final String fileUrl = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); fileClient = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution);
if (fileUrl != null) { } else {
final Map<String, String> attributes = createAttributeMap(fileSystem, originalDirectory, fileName, fileUrl, transferSize); fileClient = createFile(directoryClient, fileName, conflictResolution);
if (fileClient != null) {
uploadFile(session, flowFile, fileResourceFound, transferSize, fileClient);
}
}
if (fileClient != null) {
final String fileUrl = fileClient.getFileUrl();
final Map<String, String> attributes = createAttributeMap(fileSystem, directory, fileName, fileUrl, transferSize);
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
@ -186,12 +201,12 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
} }
} }
private DataLakeFileSystemClient getFileSystemClient(ProcessContext context, FlowFile flowFile, String fileSystem) { private DataLakeFileSystemClient getFileSystemClient(final ProcessContext context, final FlowFile flowFile, final String fileSystem) {
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
return storageClient.getFileSystemClient(fileSystem); return storageClient.getFileSystemClient(fileSystem);
} }
private Map<String, String> createAttributeMap(String fileSystem, String originalDirectory, String fileName, String fileUrl, long length) { private Map<String, String> createAttributeMap(final String fileSystem, final String originalDirectory, final String fileName, final String fileUrl, final long length) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
attributes.put(ATTR_NAME_DIRECTORY, originalDirectory); attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
@ -201,14 +216,29 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
return attributes; return attributes;
} }
private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) { private void createDirectoryIfNotExists(final DataLakeDirectoryClient directoryClient) {
if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) { if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) {
directoryClient.create(); directoryClient.create();
} }
} }
private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound,
final long transferSize, final DataLakeFileClient fileClient) throws Exception {
if (transferSize > 0) {
try (final InputStream inputStream = new BufferedInputStream(
fileResourceFound.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFile)))
) {
uploadContent(fileClient, inputStream, transferSize);
} catch (final Exception e) {
removeFile(fileClient);
throw e;
}
}
}
//Visible for testing //Visible for testing
static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) { static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) {
long chunkStart = 0; long chunkStart = 0;
long chunkSize; long chunkSize;
@ -229,19 +259,41 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
} }
/** /**
* This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 0-byte file is created, then the payload is appended to it.
* Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in * Because of that, a work-in-progress file is available for readers before the upload is complete.
* case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. *
* @param directoryClient directory client of the uploaded file's parent directory
* @param fileName name of the uploaded file
* @param conflictResolution conflict resolution strategy
* @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore'
* @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors
*/
DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) {
final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName);
try {
final boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
return directoryClient.createFile(fileName, overwrite);
} catch (DataLakeStorageException dataLakeStorageException) {
return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution);
}
}
/**
* This method serves as a "commit" for the upload process in case of 'Write and Rename' strategy. In order to prevent work-in-progress files from being available for readers,
* a temporary file is written first, and then renamed/moved to its final destination. It is not an efficient approach in case of conflicts because FlowFiles are uploaded unnecessarily,
* but it is a calculated risk because consistency is more important for 'Write and Rename' strategy.
* <p> * <p>
* Visible for testing * Visible for testing
* *
* @param sourceFileClient client of the temporary file * @param sourceFileClient file client of the temporary file
* @param destinationDirectory final location of the uploaded file * @param destinationDirectory final location of the uploaded file
* @param destinationFileName final name of the uploaded file * @param destinationFileName final name of the uploaded file
* @param conflictResolution conflict resolution strategy * @param conflictResolution conflict resolution strategy
* @return URL of the uploaded file * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore'
* @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors
*/ */
String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) {
final String destinationPath = createPath(destinationDirectory, destinationFileName); final String destinationPath = createPath(destinationDirectory, destinationFileName);
try { try {
@ -249,18 +301,24 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
if (!conflictResolution.equals(REPLACE_RESOLUTION)) { if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
destinationCondition.setIfNoneMatch("*"); destinationCondition.setIfNoneMatch("*");
} }
return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue();
} catch (DataLakeStorageException dataLakeStorageException) { } catch (DataLakeStorageException dataLakeStorageException) {
removeTempFile(sourceFileClient); removeFile(sourceFileClient);
if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution);
}
}
private DataLakeFileClient handleDataLakeStorageException(final DataLakeStorageException dataLakeStorageException, final String destinationPath, final String conflictResolution) {
final boolean fileAlreadyExists = dataLakeStorageException.getStatusCode() == 409;
if (fileAlreadyExists && conflictResolution.equals(IGNORE_RESOLUTION)) {
getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.",
destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
return null; return null;
} else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { } else if (fileAlreadyExists && conflictResolution.equals(FAIL_RESOLUTION)) {
throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException);
} else { } else {
throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); throw new ProcessException(String.format("File operation failed [%s]", destinationPath), dataLakeStorageException);
}
} }
} }
@ -270,7 +328,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
: path; : path;
} }
private void removeTempFile(final DataLakeFileClient fileClient) { private void removeFile(final DataLakeFileClient fileClient) {
try { try {
fileClient.delete(); fileClient.delete();
} catch (Exception e) { } catch (Exception e) {

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.DescribedValue;
public enum WritingStrategy implements DescribedValue {
WRITE_AND_RENAME("Write and Rename", "The processor writes the Azure file into a temporary directory and then renames/moves it to the final destination." +
" This prevents other processes from reading partially written files."),
SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly to the destination. This might result in the reading of partially written files.");
private final String displayName;
private final String description;
WritingStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -26,31 +26,57 @@
This processor is responsible for uploading files to Azure Data Lake Storage Gen2. This processor is responsible for uploading files to Azure Data Lake Storage Gen2.
</p> </p>
<h3>File uploading and cleanup process</h3> <h3>File uploading and cleanup process in case of "Write and Rename" strategy</h3>
<h4>New file upload</h4> <h4>New file upload</h4>
<ol> <ol>
<li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li>
<li>Content is appended to temp file.</li> <li>Content is appended to temp file.</li>
<li>Temp file is renamed to its original name, the original file is overwritten.</li> <li>Temp file is moved to the final destination directory and renamed to its original name.</li>
<li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> <li>In case of appending or renaming failure, the temp file is deleted.</li>
<li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> <li>In case of temporary file deletion failure, the temp file remains on the server.</li>
</ol> </ol>
<h4>Existing file upload</h4> <h4>Existing file upload</h4>
<ul> <ul>
<li>Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.</li> <li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li>
<li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li> <li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li>
<li>Processors with "replace" conflict resolution strategy:</li> <li>Processors with "replace" conflict resolution strategy:</li>
<ol> <ol>
<li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li>
<li>Content is appended to temp file.</li> <li>Content is appended to temp file.</li>
<li>Temp file is renamed to its original name, the original file is overwritten.</li> <li>Temp file is moved to the final destination directory and renamed to its original name, the original file is overwritten.</li>
<li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> <li>In case of appending or renaming failure, the temp file is deleted and the original file remains intact.</li>
<li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> <li>In case of temporary file deletion failure, both temp file and original file remain on the server.</li>
</ol>
</ul>
<h3>File uploading and cleanup process in case of "Simple Write" strategy</h3>
<h4>New file upload</h4>
<ol>
<li>An empty file is created at its final destination.</li>
<li>Content is appended to the file.</li>
<li>In case of appending failure, the file is deleted.</li>
<li>In case of file deletion failure, the file remains on the server.</li>
</ol>
<h4>Existing file upload</h4>
<ul>
<li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li>
<li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li>
<li>Processors with "replace" conflict resolution strategy:</li>
<ol>
<li>An empty file is created at its final destination, the original file is overwritten.</li>
<li>Content is appended to the file.</li>
<li>In case of appending failure, the file is deleted and the original file is not restored.</li>
<li>In case of file deletion failure, the file remains on the server.</li>
</ol> </ol>
</ul> </ul>

View File

@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService; import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService; import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -53,6 +54,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
protected void setUpCredentials() throws Exception { protected void setUpCredentials() throws Exception {
ADLSCredentialsService service = new ADLSCredentialsControllerService(); ADLSCredentialsService service = new ADLSCredentialsControllerService();
runner.addControllerService("ADLSCredentials", service); runner.addControllerService("ADLSCredentials", service);
runner.setProperty(service, AzureStorageUtils.CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCOUNT_KEY);
runner.setProperty(service, ADLSCredentialsControllerService.ACCOUNT_NAME, getAccountName()); runner.setProperty(service, ADLSCredentialsControllerService.ACCOUNT_NAME, getAccountName());
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.enableControllerService(service); runner.enableControllerService(service);

View File

@ -23,13 +23,15 @@ import org.apache.nifi.fileresource.service.StandardFileResourceService;
import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.WritingStrategy;
import org.apache.nifi.processors.transfer.ResourceTransferProperties; import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
@ -69,8 +71,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
runner.setProperty(AzureStorageUtils.FILE, FILE_NAME); runner.setProperty(AzureStorageUtils.FILE, FILE_NAME);
} }
@Test @ParameterizedTest
public void testPutFileToExistingDirectory() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToExistingDirectory(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
fileSystemClient.createDirectory(DIRECTORY); fileSystemClient.createDirectory(DIRECTORY);
runProcessor(FILE_DATA); runProcessor(FILE_DATA);
@ -78,18 +83,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception { @EnumSource(WritingStrategy.class)
fileSystemClient.createDirectory(DIRECTORY); public void testPutFileToExistingDirectoryWithReplaceResolution(WritingStrategy writingStrategy) throws Exception {
configureProxyService(); setWritingStrategy(writingStrategy);
runProcessor(FILE_DATA);
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception {
fileSystemClient.createDirectory(DIRECTORY); fileSystemClient.createDirectory(DIRECTORY);
runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
@ -99,8 +97,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToExistingDirectoryWithIgnoreResolution() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToExistingDirectoryWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
fileSystemClient.createDirectory(DIRECTORY); fileSystemClient.createDirectory(DIRECTORY);
runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION);
@ -110,15 +111,21 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToNonExistingDirectory() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToNonExistingDirectory(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
runProcessor(FILE_DATA); runProcessor(FILE_DATA);
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToDeepDirectory() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToDeepDirectory(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
String baseDirectory = "dir1/dir2"; String baseDirectory = "dir1/dir2";
String fullDirectory = baseDirectory + "/dir3/dir4"; String fullDirectory = baseDirectory + "/dir3/dir4";
fileSystemClient.createDirectory(baseDirectory); fileSystemClient.createDirectory(baseDirectory);
@ -129,8 +136,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(fullDirectory, FILE_NAME, FILE_DATA); assertSuccess(fullDirectory, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToRootDirectory() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToRootDirectory(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
String rootDirectory = ""; String rootDirectory = "";
runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory); runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory);
@ -139,8 +149,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(rootDirectory, FILE_NAME, FILE_DATA); assertSuccess(rootDirectory, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutEmptyFile() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutEmptyFile(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
byte[] fileData = new byte[0]; byte[] fileData = new byte[0];
runProcessor(fileData); runProcessor(fileData);
@ -148,8 +161,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, fileData); assertSuccess(DIRECTORY, FILE_NAME, fileData);
} }
@Test @ParameterizedTest
public void testPutBigFile() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutBigFile(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
Random random = new Random(); Random random = new Random();
byte[] fileData = new byte[120_000_000]; byte[] fileData = new byte[120_000_000];
random.nextBytes(fileData); random.nextBytes(fileData);
@ -159,8 +175,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, fileData); assertSuccess(DIRECTORY, FILE_NAME, fileData);
} }
@Test @ParameterizedTest
public void testPutFileWithNonExistingFileSystem() { @EnumSource(WritingStrategy.class)
public void testPutFileWithNonExistingFileSystem(WritingStrategy writingStrategy) {
setWritingStrategy(writingStrategy);
runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy"); runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
runProcessor(FILE_DATA); runProcessor(FILE_DATA);
@ -168,8 +187,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure(); assertFailure();
} }
@Test @ParameterizedTest
public void testPutFileWithInvalidFileName() { @EnumSource(WritingStrategy.class)
public void testPutFileWithInvalidFileName(WritingStrategy writingStrategy) {
setWritingStrategy(writingStrategy);
runner.setProperty(AzureStorageUtils.FILE, "/file1"); runner.setProperty(AzureStorageUtils.FILE, "/file1");
runProcessor(FILE_DATA); runProcessor(FILE_DATA);
@ -177,8 +199,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure(); assertFailure();
} }
@Test @ParameterizedTest
public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileWithSpacesInDirectoryAndFileName(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
String directory = "dir 1"; String directory = "dir 1";
String fileName = "file 1"; String fileName = "file 1";
runner.setProperty(AzureStorageUtils.DIRECTORY, directory); runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
@ -189,8 +214,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(directory, fileName, FILE_DATA); assertSuccess(directory, fileName, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToExistingFileWithFailResolution() { @EnumSource(WritingStrategy.class)
public void testPutFileToExistingFileWithFailResolution(WritingStrategy writingStrategy) {
setWritingStrategy(writingStrategy);
fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
runProcessor(FILE_DATA); runProcessor(FILE_DATA);
@ -198,8 +226,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure(); assertFailure();
} }
@Test @ParameterizedTest
public void testPutFileToExistingFileWithReplaceResolution() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToExistingFileWithReplaceResolution(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
@ -209,8 +240,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileToExistingFileWithIgnoreResolution() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileToExistingFileWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
String azureFileContent = "AzureFileContent"; String azureFileContent = "AzureFileContent";
createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent); createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent);
@ -221,8 +255,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes(StandardCharsets.UTF_8)); assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes(StandardCharsets.UTF_8));
} }
@Test @ParameterizedTest
public void testPutFileWithEL() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileWithEL(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
Map<String, String> attributes = createAttributesMap(); Map<String, String> attributes = createAttributesMap();
setELProperties(); setELProperties();
@ -231,8 +268,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
} }
@Test @ParameterizedTest
public void testPutFileWithELButFilesystemIsNotSpecified() { @EnumSource(WritingStrategy.class)
public void testPutFileWithELButFilesystemIsNotSpecified(WritingStrategy writingStrategy) {
setWritingStrategy(writingStrategy);
Map<String, String> attributes = createAttributesMap(); Map<String, String> attributes = createAttributesMap();
attributes.remove(EL_FILESYSTEM); attributes.remove(EL_FILESYSTEM);
setELProperties(); setELProperties();
@ -242,8 +282,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure(); assertFailure();
} }
@Test @ParameterizedTest
public void testPutFileWithELButFileNameIsNotSpecified() { @EnumSource(WritingStrategy.class)
public void testPutFileWithELButFileNameIsNotSpecified(WritingStrategy writingStrategy) {
setWritingStrategy(writingStrategy);
Map<String, String> attributes = createAttributesMap(); Map<String, String> attributes = createAttributesMap();
attributes.remove(EL_FILE_NAME); attributes.remove(EL_FILE_NAME);
setELProperties(); setELProperties();
@ -253,8 +296,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure(); assertFailure();
} }
@Test @ParameterizedTest
public void testPutFileFromLocalFile() throws Exception { @EnumSource(WritingStrategy.class)
public void testPutFileFromLocalFile(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
String attributeName = "file.path"; String attributeName = "file.path";
String serviceId = FileResourceService.class.getSimpleName(); String serviceId = FileResourceService.class.getSimpleName();
@ -280,6 +326,19 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertProvenanceEvents(); assertProvenanceEvents();
} }
@ParameterizedTest
@EnumSource(WritingStrategy.class)
public void testPutFileUsingProxy(WritingStrategy writingStrategy) throws Exception {
setWritingStrategy(writingStrategy);
fileSystemClient.createDirectory(DIRECTORY);
configureProxyService();
runProcessor(FILE_DATA);
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
}
private Map<String, String> createAttributesMap() { private Map<String, String> createAttributesMap() {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
@ -290,6 +349,10 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
return attributes; return attributes;
} }
private void setWritingStrategy(WritingStrategy writingStrategy) {
runner.setProperty(PutAzureDataLakeStorage.WRITING_STRATEGY, writingStrategy);
}
private void setELProperties() { private void setELProperties() {
runner.setProperty(AzureStorageUtils.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); runner.setProperty(AzureStorageUtils.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}", EL_DIRECTORY)); runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}", EL_DIRECTORY));