From bffc34252158103d0de727783c85c1cac727e5f4 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Thu, 21 Mar 2024 08:47:28 +0100 Subject: [PATCH] NIFI-12928 Added Simple Write strategy in PutAzureDataLakeStorage Signed-off-by: Pierre Villard This closes #8540. --- .../storage/PutAzureDataLakeStorage.java | 148 ++++++++++++----- .../azure/storage/utils/WritingStrategy.java | 49 ++++++ .../additionalDetails.html | 44 ++++- .../AbstractAzureDataLakeStorageIT.java | 2 + .../storage/ITPutAzureDataLakeStorage.java | 155 ++++++++++++------ 5 files changed, 298 insertions(+), 100 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 7651ad6a99..d385eeb27d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -40,6 +40,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator; +import org.apache.nifi.processors.azure.storage.utils.WritingStrategy; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.StringUtils; @@ -99,6 +100,15 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .build(); + protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("writing-strategy") + .displayName("Writing Strategy") + .description("Defines the approach for writing the Azure file.") + .required(true) + .allowableValues(WritingStrategy.class) + .defaultValue(WritingStrategy.WRITE_AND_RENAME) + .build(); + public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder() .name("base-temporary-path") .displayName("Base Temporary Path") @@ -108,6 +118,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .defaultValue("") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(new DirectoryValidator("Base Temporary Path")) + .dependsOn(WRITING_STRATEGY, WritingStrategy.WRITE_AND_RENAME) .build(); private static final List PROPERTIES = List.of( @@ -115,6 +126,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess FILESYSTEM, DIRECTORY, FILE, + WRITING_STRATEGY, BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, RESOURCE_TRANSFER_SOURCE, @@ -137,41 +149,44 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final long startNanos = System.nanoTime(); try { final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile); - final String originalDirectory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); - final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile); - final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); + final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); final String fileName = evaluateFileProperty(context, flowFile); final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem); - final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory); + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); - final String tempFilePrefix = UUID.randomUUID().toString(); - final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); + final WritingStrategy writingStrategy = context.getProperty(WRITING_STRATEGY).asAllowableValue(WritingStrategy.class); final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); final Optional fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize()); - final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); - if (transferSize > 0) { - final FlowFile sourceFlowFile = flowFile; - try ( - final InputStream inputStream = new BufferedInputStream( - fileResourceFound.map(FileResource::getInputStream) - .orElseGet(() -> session.read(sourceFlowFile)) - ) - ) { - uploadContent(tempFileClient, inputStream, transferSize); - } catch (final Exception e) { - removeTempFile(tempFileClient); - throw e; + final DataLakeFileClient fileClient; + + if (writingStrategy == WritingStrategy.WRITE_AND_RENAME) { + final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile); + final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); + final String tempFilePrefix = UUID.randomUUID().toString(); + + final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); + final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); + + uploadFile(session, flowFile, fileResourceFound, transferSize, tempFileClient); + + createDirectoryIfNotExists(directoryClient); + + fileClient = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); + } else { + fileClient = createFile(directoryClient, fileName, conflictResolution); + + if (fileClient != null) { + uploadFile(session, flowFile, fileResourceFound, transferSize, fileClient); } } - createDirectoryIfNotExists(directoryClient); - final String fileUrl = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); - if (fileUrl != null) { - final Map attributes = createAttributeMap(fileSystem, originalDirectory, fileName, fileUrl, transferSize); + if (fileClient != null) { + final String fileUrl = fileClient.getFileUrl(); + final Map attributes = createAttributeMap(fileSystem, directory, fileName, fileUrl, transferSize); flowFile = session.putAllAttributes(flowFile, attributes); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); @@ -186,12 +201,12 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } } - private DataLakeFileSystemClient getFileSystemClient(ProcessContext context, FlowFile flowFile, String fileSystem) { + private DataLakeFileSystemClient getFileSystemClient(final ProcessContext context, final FlowFile flowFile, final String fileSystem) { final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); return storageClient.getFileSystemClient(fileSystem); } - private Map createAttributeMap(String fileSystem, String originalDirectory, String fileName, String fileUrl, long length) { + private Map createAttributeMap(final String fileSystem, final String originalDirectory, final String fileName, final String fileUrl, final long length) { final Map attributes = new HashMap<>(); attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); attributes.put(ATTR_NAME_DIRECTORY, originalDirectory); @@ -201,14 +216,29 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess return attributes; } - private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) { + private void createDirectoryIfNotExists(final DataLakeDirectoryClient directoryClient) { if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) { directoryClient.create(); } } + private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional fileResourceFound, + final long transferSize, final DataLakeFileClient fileClient) throws Exception { + if (transferSize > 0) { + try (final InputStream inputStream = new BufferedInputStream( + fileResourceFound.map(FileResource::getInputStream) + .orElseGet(() -> session.read(flowFile))) + ) { + uploadContent(fileClient, inputStream, transferSize); + } catch (final Exception e) { + removeFile(fileClient); + throw e; + } + } + } + //Visible for testing - static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) { + static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) { long chunkStart = 0; long chunkSize; @@ -229,19 +259,41 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } /** - * This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. - * Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in - * case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. + * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 0-byte file is created, then the payload is appended to it. + * Because of that, a work-in-progress file is available for readers before the upload is complete. + * + * @param directoryClient directory client of the uploaded file's parent directory + * @param fileName name of the uploaded file + * @param conflictResolution conflict resolution strategy + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors + */ + DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); + + try { + final boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); + return directoryClient.createFile(fileName, overwrite); + } catch (DataLakeStorageException dataLakeStorageException) { + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + /** + * This method serves as a "commit" for the upload process in case of 'Write and Rename' strategy. In order to prevent work-in-progress files from being available for readers, + * a temporary file is written first, and then renamed/moved to its final destination. It is not an efficient approach in case of conflicts because FlowFiles are uploaded unnecessarily, + * but it is a calculated risk because consistency is more important for 'Write and Rename' strategy. *

* Visible for testing * - * @param sourceFileClient client of the temporary file + * @param sourceFileClient file client of the temporary file * @param destinationDirectory final location of the uploaded file * @param destinationFileName final name of the uploaded file * @param conflictResolution conflict resolution strategy - * @return URL of the uploaded file + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { + DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { final String destinationPath = createPath(destinationDirectory, destinationFileName); try { @@ -249,18 +301,24 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess if (!conflictResolution.equals(REPLACE_RESOLUTION)) { destinationCondition.setIfNoneMatch("*"); } - return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); + return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue(); } catch (DataLakeStorageException dataLakeStorageException) { - removeTempFile(sourceFileClient); - if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { - getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", - destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); - return null; - } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { - throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); - } else { - throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); - } + removeFile(sourceFileClient); + + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + private DataLakeFileClient handleDataLakeStorageException(final DataLakeStorageException dataLakeStorageException, final String destinationPath, final String conflictResolution) { + final boolean fileAlreadyExists = dataLakeStorageException.getStatusCode() == 409; + if (fileAlreadyExists && conflictResolution.equals(IGNORE_RESOLUTION)) { + getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", + destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); + return null; + } else if (fileAlreadyExists && conflictResolution.equals(FAIL_RESOLUTION)) { + throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); + } else { + throw new ProcessException(String.format("File operation failed [%s]", destinationPath), dataLakeStorageException); } } @@ -270,7 +328,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess : path; } - private void removeTempFile(final DataLakeFileClient fileClient) { + private void removeFile(final DataLakeFileClient fileClient) { try { fileClient.delete(); } catch (Exception e) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java new file mode 100644 index 0000000000..6fda36623d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import org.apache.nifi.components.DescribedValue; + +public enum WritingStrategy implements DescribedValue { + + WRITE_AND_RENAME("Write and Rename", "The processor writes the Azure file into a temporary directory and then renames/moves it to the final destination." + + " This prevents other processes from reading partially written files."), + SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly to the destination. This might result in the reading of partially written files."); + + private final String displayName; + private final String description; + + WritingStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html index 2469ceafaa..4758d4c19e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html @@ -26,31 +26,57 @@ This processor is responsible for uploading files to Azure Data Lake Storage Gen2.

-

File uploading and cleanup process

+

File uploading and cleanup process in case of "Write and Rename" strategy

New file upload

  1. A temporary file is created with random prefix under the given path in '_nifitempdirectory'.
  2. Content is appended to temp file.
  3. -
  4. Temp file is renamed to its original name, the original file is overwritten.
  5. -
  6. In case of appending or renaming failure the temp file is deleted, the original file remains intact.
  7. -
  8. In case of temporary file deletion failure both temp file and original file remain on the server.
  9. +
  10. Temp file is moved to the final destination directory and renamed to its original name.
  11. +
  12. In case of appending or renaming failure, the temp file is deleted.
  13. +
  14. In case of temporary file deletion failure, the temp file remains on the server.

Existing file upload

    -
  • Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.
  • -
  • Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.
  • +
  • Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.
  • +
  • Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.
  • Processors with "replace" conflict resolution strategy:
    1. A temporary file is created with random prefix under the given path in '_nifitempdirectory'.
    2. Content is appended to temp file.
    3. -
    4. Temp file is renamed to its original name, the original file is overwritten.
    5. -
    6. In case of appending or renaming failure the temp file is deleted, the original file remains intact.
    7. -
    8. In case of temporary file deletion failure both temp file and original file remain on the server.
    9. +
    10. Temp file is moved to the final destination directory and renamed to its original name, the original file is overwritten.
    11. +
    12. In case of appending or renaming failure, the temp file is deleted and the original file remains intact.
    13. +
    14. In case of temporary file deletion failure, both temp file and original file remain on the server.
    15. +
    +
+ +

File uploading and cleanup process in case of "Simple Write" strategy

+ +

New file upload

+ +
    +
  1. An empty file is created at its final destination.
  2. +
  3. Content is appended to the file.
  4. +
  5. In case of appending failure, the file is deleted.
  6. +
  7. In case of file deletion failure, the file remains on the server.
  8. +
+ +

Existing file upload

+ +
    +
  • Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.
  • +
  • Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.
  • +
  • Processors with "replace" conflict resolution strategy:
  • + +
      +
    1. An empty file is created at its final destination, the original file is overwritten.
    2. +
    3. Content is appended to the file.
    4. +
    5. In case of appending failure, the file is deleted and the original file is not restored.
    6. +
    7. In case of file deletion failure, the file remains on the server.
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index 8e94485a1f..fc5f2e6232 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -53,6 +54,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag protected void setUpCredentials() throws Exception { ADLSCredentialsService service = new ADLSCredentialsControllerService(); runner.addControllerService("ADLSCredentials", service); + runner.setProperty(service, AzureStorageUtils.CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCOUNT_KEY); runner.setProperty(service, ADLSCredentialsControllerService.ACCOUNT_NAME, getAccountName()); runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); runner.enableControllerService(service); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 4a59a9b3a4..b4708375b9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -23,13 +23,15 @@ import org.apache.nifi.fileresource.service.StandardFileResourceService; import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.WritingStrategy; import org.apache.nifi.processors.transfer.ResourceTransferProperties; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -69,8 +71,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.setProperty(AzureStorageUtils.FILE, FILE_NAME); } - @Test - public void testPutFileToExistingDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createDirectory(DIRECTORY); runProcessor(FILE_DATA); @@ -78,18 +83,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception { - fileSystemClient.createDirectory(DIRECTORY); - configureProxyService(); + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectoryWithReplaceResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); - runProcessor(FILE_DATA); - - assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); - } - - @Test - public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception { fileSystemClient.createDirectory(DIRECTORY); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); @@ -99,8 +97,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingDirectoryWithIgnoreResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectoryWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createDirectory(DIRECTORY); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION); @@ -110,15 +111,21 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToNonExistingDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToNonExistingDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + runProcessor(FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToDeepDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToDeepDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String baseDirectory = "dir1/dir2"; String fullDirectory = baseDirectory + "/dir3/dir4"; fileSystemClient.createDirectory(baseDirectory); @@ -129,8 +136,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(fullDirectory, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToRootDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToRootDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String rootDirectory = ""; runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory); @@ -139,8 +149,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(rootDirectory, FILE_NAME, FILE_DATA); } - @Test - public void testPutEmptyFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutEmptyFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + byte[] fileData = new byte[0]; runProcessor(fileData); @@ -148,8 +161,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, fileData); } - @Test - public void testPutBigFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutBigFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + Random random = new Random(); byte[] fileData = new byte[120_000_000]; random.nextBytes(fileData); @@ -159,8 +175,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, fileData); } - @Test - public void testPutFileWithNonExistingFileSystem() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithNonExistingFileSystem(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy"); runProcessor(FILE_DATA); @@ -168,8 +187,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithInvalidFileName() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithInvalidFileName(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + runner.setProperty(AzureStorageUtils.FILE, "/file1"); runProcessor(FILE_DATA); @@ -177,8 +199,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithSpacesInDirectoryAndFileName(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String directory = "dir 1"; String fileName = "file 1"; runner.setProperty(AzureStorageUtils.DIRECTORY, directory); @@ -189,8 +214,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(directory, fileName, FILE_DATA); } - @Test - public void testPutFileToExistingFileWithFailResolution() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithFailResolution(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); runProcessor(FILE_DATA); @@ -198,8 +226,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileToExistingFileWithReplaceResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithReplaceResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); @@ -209,8 +240,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingFileWithIgnoreResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String azureFileContent = "AzureFileContent"; createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent); @@ -221,8 +255,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes(StandardCharsets.UTF_8)); } - @Test - public void testPutFileWithEL() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithEL(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + Map attributes = createAttributesMap(); setELProperties(); @@ -231,8 +268,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileWithELButFilesystemIsNotSpecified() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithELButFilesystemIsNotSpecified(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + Map attributes = createAttributesMap(); attributes.remove(EL_FILESYSTEM); setELProperties(); @@ -242,8 +282,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithELButFileNameIsNotSpecified() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithELButFileNameIsNotSpecified(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + Map attributes = createAttributesMap(); attributes.remove(EL_FILE_NAME); setELProperties(); @@ -253,8 +296,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileFromLocalFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileFromLocalFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String attributeName = "file.path"; String serviceId = FileResourceService.class.getSimpleName(); @@ -280,6 +326,19 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertProvenanceEvents(); } + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileUsingProxy(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + + fileSystemClient.createDirectory(DIRECTORY); + configureProxyService(); + + runProcessor(FILE_DATA); + + assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); + } + private Map createAttributesMap() { Map attributes = new HashMap<>(); @@ -290,6 +349,10 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { return attributes; } + private void setWritingStrategy(WritingStrategy writingStrategy) { + runner.setProperty(PutAzureDataLakeStorage.WRITING_STRATEGY, writingStrategy); + } + private void setELProperties() { runner.setProperty(AzureStorageUtils.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}", EL_DIRECTORY));