From 22e8901fced6f56d9913cec240458e3268449873 Mon Sep 17 00:00:00 2001 From: Timea Barna Date: Mon, 27 Jun 2022 07:50:29 +0200 Subject: [PATCH] NIFI-8248 Modified PutAzureDataLakeStorage processor to use temp file instead of inline replacement This closes #6159 Signed-off-by: Joey Frazee --- ...AbstractAzureDataLakeStorageProcessor.java | 2 + .../storage/ListAzureDataLakeStorage.java | 13 ++ .../storage/PutAzureDataLakeStorage.java | 103 ++++++-- .../additionalDetails.html | 24 +- .../storage/ITListAzureDataLakeStorage.java | 220 +++++++++++++++++- .../storage/ITPutAzureDataLakeStorage.java | 46 +++- 6 files changed, 365 insertions(+), 43 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 3047de5f6e..1a66dda500 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -101,6 +101,8 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc REL_FAILURE ))); + public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory"; + @Override public Set getRelationships() { return RELATIONSHIPS; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java index cc43ffd567..24f51b5ae2 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -63,6 +63,7 @@ import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_T import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient; @@ -129,6 +130,15 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor PROPERTIES = Collections.unmodifiableList(Arrays.asList( ADLS_CREDENTIALS_SERVICE, FILESYSTEM, @@ -136,6 +146,7 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor listing = fileSystemClient.listPaths(options, null).stream() .filter(pathItem -> !pathItem.isDirectory()) + .filter(pathItem -> includeTempFiles || !pathItem.getName().contains(TEMP_FILE_DIRECTORY)) .filter(pathItem -> isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, pathItem.getLastModified().toInstant().toEpochMilli(), pathItem.getContentLength())) .map(pathItem -> new ADLSFileInfo.Builder() .fileSystem(fileSystem) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 3c10d068b0..dbd6138e15 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.DataLakeRequestConditions; import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.commons.io.input.BoundedInputStream; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -30,20 +31,24 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.StringUtils; import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; @@ -83,11 +88,23 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .build(); + public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder() + .name("base-temporary-path") + .displayName("Base Temporary Path") + .description("The Path where the temporary directory will be created. The Path name cannot contain a leading '/'." + + " The root directory can be designated by the empty string value. Non-existing directories will be created." + + "The Temporary File Directory name is " + TEMP_FILE_DIRECTORY) + .defaultValue("") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(new DirectoryValidator("Base Temporary Path")) + .build(); + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( ADLS_CREDENTIALS_SERVICE, FILESYSTEM, DIRECTORY, FILE, + BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @@ -107,41 +124,39 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final long startNanos = System.nanoTime(); try { final String fileSystem = evaluateFileSystemProperty(context, flowFile); - final String directory = evaluateDirectoryProperty(context, flowFile); + final String originalDirectory = evaluateDirectoryProperty(context, flowFile); + final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH); + final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); final String fileName = evaluateFileNameProperty(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); - final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); - final DataLakeFileClient fileClient; + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory); + final DataLakeFileClient tempFileClient; + final DataLakeFileClient renamedFileClient; + final String tempFilePrefix = UUID.randomUUID().toString(); + final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); try { - fileClient = directoryClient.createFile(fileName, overwrite); - - final long length = flowFile.getSize(); - if (length > 0) { - try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) { - uploadContent(fileClient, bufferedIn, length); - } catch (Exception e) { - removeTempFile(fileClient); - throw e; - } - } + tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); + appendContent(flowFile, tempFileClient, session); + createDirectoryIfNotExists(directoryClient); + renamedFileClient = renameFile(fileName, directoryClient.getDirectoryPath(), tempFileClient, overwrite); final Map attributes = new HashMap<>(); attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); - attributes.put(ATTR_NAME_DIRECTORY, directory); + attributes.put(ATTR_NAME_DIRECTORY, originalDirectory); attributes.put(ATTR_NAME_FILENAME, fileName); - attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl()); - attributes.put(ATTR_NAME_LENGTH, String.valueOf(length)); + attributes.put(ATTR_NAME_PRIMARY_URI, renamedFileClient.getFileUrl()); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(flowFile.getSize())); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + session.getProvenanceReporter().send(flowFile, renamedFileClient.getFileUrl(), transferMillis); } catch (DataLakeStorageException dlsException) { if (dlsException.getStatusCode() == 409) { if (conflictResolution.equals(IGNORE_RESOLUTION)) { @@ -164,14 +179,26 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } } - private void removeTempFile(DataLakeFileClient fileClient) { - try { - fileClient.delete(); - } catch (Exception e) { - getLogger().error("Error while removing temp file on Azure Data Lake Storage", e); + private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) { + if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) { + directoryClient.create(); } } + //Visible for testing + void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, ProcessSession session) throws IOException { + final long length = flowFile.getSize(); + if (length > 0) { + try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) { + uploadContent(fileClient, bufferedIn, length); + } catch (Exception e) { + removeTempFile(fileClient); + throw e; + } + } + } + + //Visible for testing static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) { long chunkStart = 0; long chunkSize; @@ -190,4 +217,34 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess fileClient.flush(length); } + + //Visible for testing + DataLakeFileClient renameFile(final String fileName, final String directoryPath, final DataLakeFileClient fileClient, final boolean overwrite) { + try { + final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions(); + if (!overwrite) { + destinationCondition.setIfNoneMatch("*"); + } + final String destinationPath = createPath(directoryPath, fileName); + return fileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue(); + } catch (DataLakeStorageException dataLakeStorageException) { + getLogger().error("Renaming File [{}] failed", fileClient.getFileName(), dataLakeStorageException); + removeTempFile(fileClient); + throw dataLakeStorageException; + } + } + + private String createPath(final String baseDirectory, final String path) { + return StringUtils.isNotBlank(baseDirectory) + ? baseDirectory + "/" + path + : path; + } + + private void removeTempFile(final DataLakeFileClient fileClient) { + try { + fileClient.delete(); + } catch (Exception e) { + getLogger().error("Renaming File [{}] failed", fileClient.getFileName(), e); + } + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html index 40e78d157c..2469ceafaa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html @@ -28,27 +28,29 @@

File uploading and cleanup process

-

New file

+

New file upload

    -
  1. An empty file is created.
  2. -
  3. Content is appended to file.
  4. -
  5. In case append failure the file is deleted.
  6. -
  7. In case file deletion failure the empty file remains on the server.
  8. +
  9. A temporary file is created with random prefix under the given path in '_nifitempdirectory'.
  10. +
  11. Content is appended to temp file.
  12. +
  13. Temp file is renamed to its original name, the original file is overwritten.
  14. +
  15. In case of appending or renaming failure the temp file is deleted, the original file remains intact.
  16. +
  17. In case of temporary file deletion failure both temp file and original file remain on the server.
-

Existing file

+

Existing file upload

  • Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.
  • -
  • Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.
  • +
  • Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.
  • Processors with "replace" conflict resolution strategy:
    1. -
    2. An empty file overwrites the existing file, the original file is lost.
    3. -
    4. Content is appended to file.
    5. -
    6. In case append failure the file is deleted.
    7. -
    8. In case file deletion failure the empty file remains on the server.
    9. +
    10. A temporary file is created with random prefix under the given path in '_nifitempdirectory'.
    11. +
    12. Content is appended to temp file.
    13. +
    14. Temp file is renamed to its original name, the original file is overwritten.
    15. +
    16. In case of appending or renaming failure the temp file is deleted, the original file remains intact.
    17. +
    18. In case of temporary file deletion failure both temp file and original file remain on the server.
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java index 6c45663e33..978fe9433f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -37,6 +37,7 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -57,6 +58,10 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { uploadFile(testFile1); testFiles.put(testFile1.getFilePath(), testFile1); + TestFile testTempFile1 = new TestFile(AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY, "1234file1"); + uploadFile(testTempFile1); + testFiles.put(testTempFile1.getFilePath(), testTempFile1); + TestFile testFile2 = new TestFile("", "file2"); uploadFile(testFile2); testFiles.put(testFile2.getFilePath(), testFile2); @@ -65,6 +70,10 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { createDirectoryAndUploadFile(testFile11); testFiles.put(testFile11.getFilePath(), testFile11); + TestFile testTempFile11 = new TestFile(String.format("dir1/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "5678file11"); + uploadFile(testTempFile11); + testFiles.put(testTempFile11.getFilePath(), testTempFile11); + TestFile testFile12 = new TestFile("dir1", "file12"); uploadFile(testFile12); testFiles.put(testFile12.getFilePath(), testFile12); @@ -73,10 +82,18 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { createDirectoryAndUploadFile(testFile111); testFiles.put(testFile111.getFilePath(), testFile111); + TestFile testTempFile111 = new TestFile(String.format("dir1/dir11/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "9010file111"); + uploadFile(testTempFile111); + testFiles.put(testTempFile111.getFilePath(), testTempFile111); + TestFile testFile21 = new TestFile("dir 2", "file 21", "Test"); createDirectoryAndUploadFile(testFile21); testFiles.put(testFile21.getFilePath(), testFile21); + TestFile testTempFile21 = new TestFile(String.format("dir2/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "1112file21", "Test"); + uploadFile(testTempFile21); + testFiles.put(testTempFile21.getFilePath(), testTempFile21); + createDirectory("dir3"); } @@ -89,6 +106,20 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); } + @Test + public void testListRootRecursiveWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21", + String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListRootRecursiveUsingProxyConfigurationService() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -109,6 +140,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "file2"); } + @Test + public void testListRootNonRecursiveWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "file2"); + } + @Test public void testListSubdirectoryRecursive() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); @@ -118,6 +160,18 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListSubdirectoryRecursiveWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListSubdirectoryNonRecursive() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); @@ -128,20 +182,45 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/file11", "dir1/file12"); } + @Test + public void testListSubdirectoryNonRecursiveWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12"); + } + @Test public void testListWithFileFilter() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); - runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$"); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$"); runProcessor(); assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListWithFileFilterWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListWithFileFilterWithEL() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); - runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$"); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$"); runner.setVariable("suffix", "1.*"); runProcessor(); @@ -149,6 +228,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListWithFileFilterWithELWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$"); + runner.setVariable("suffix", "1.*"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListRootWithPathFilter() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -159,6 +253,19 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListRootWithPathFilterWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListRootWithPathFilterWithEL() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -171,6 +278,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListRootWithPathFilterWithELWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}"); + runner.setVariable("prefix", "^dir"); + runner.setVariable("suffix", "1.*$"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListSubdirectoryWithPathFilter() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); @@ -181,6 +303,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/dir11/file111"); } + @Test + public void testListSubdirectoryWithPathFilterWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/dir11/file111", String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListRootWithFileAndPathFilter() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -192,6 +325,19 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir1/file11", "dir1/dir11/file111"); } + @Test + public void testListRootWithFileAndPathFilterWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11"); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/dir11/file111", String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListEmptyDirectory() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3"); @@ -236,6 +382,24 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { flowFile.assertAttributeEquals("record.count", "3"); } + @Test + public void testListWithRecordsWithTempFiles() throws InitializationException { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + + MockRecordWriter recordWriter = new MockRecordWriter(null, false); + runner.addControllerService("record-writer", recordWriter); + runner.enableControllerService(recordWriter); + runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer"); + + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("record.count", "5"); + } + @Test public void testListWithMinAge() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -246,6 +410,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0); } + @Test + public void testListWithMinAgeWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0); + } + @Test public void testListWithMaxAge() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -256,6 +431,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); } + @Test + public void testListWithMaxAgeWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21", + String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListWithMinSize() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -266,6 +456,20 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); } + @Test + public void testListWithMinSizeWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", + String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), + String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + @Test public void testListWithMaxSize() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -276,6 +480,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("dir 2/file 21"); } + @Test + public void testListWithMaxSizeWithTempFiles() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B"); + runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); + + runProcessor(); + + assertSuccess("dir 2/file 21", String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY)); + } + private void runProcessor() { runner.assertValid(); runner.run(); @@ -288,6 +503,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths)); List flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS); + assertEquals(expectedFiles.size(), flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { String filePath = flowFile.getAttribute("azure.filePath"); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 20efbb4372..f249b38a9e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -16,12 +16,19 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.core.http.rest.Response; import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.models.DataLakeRequestConditions; +import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,9 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @@ -255,15 +265,37 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testPutFileButFailedToAppend() { - DataLakeFileClient fileClient = mock(DataLakeFileClient.class); - InputStream stream = mock(InputStream.class); - doThrow(NullPointerException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong()); + final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage(); + final DataLakeFileClient fileClient = mock(DataLakeFileClient.class); + final ProcessSession session = mock(ProcessSession.class); + final FlowFile flowFile = mock(FlowFile.class); - assertThrows(NullPointerException.class, () -> { - PutAzureDataLakeStorage.uploadContent(fileClient, stream, FILE_DATA.length); + when(flowFile.getSize()).thenReturn(1L); + doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong()); - verify(fileClient).delete(); - }); + assertThrows(IllegalArgumentException.class, () -> processor.appendContent(flowFile, fileClient, session)); + verify(fileClient).delete(); + } + + @Test + public void testPutFileButFailedToRename() { + final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage(); + final ProcessorInitializationContext initContext = mock(ProcessorInitializationContext.class); + final String componentId = "componentId"; + final DataLakeFileClient fileClient = mock(DataLakeFileClient.class); + final Response response = mock(Response.class); + //Mock logger + when(initContext.getIdentifier()).thenReturn(componentId); + MockComponentLog componentLog = new MockComponentLog(componentId, processor); + when(initContext.getLogger()).thenReturn(componentLog); + processor.initialize(initContext); + //Mock renameWithResponse Azure method + when(fileClient.renameWithResponse(isNull(), anyString(), isNull(), any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response); + when(response.getValue()).thenThrow(DataLakeStorageException.class); + when(fileClient.getFileName()).thenReturn(FILE_NAME); + + assertThrows(DataLakeStorageException.class, () -> processor.renameFile(FILE_NAME, "", fileClient, false)); + verify(fileClient).delete(); } private Map createAttributesMap() {