diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index 86b23aa859..314785a6a6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -16,6 +16,11 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -27,11 +32,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; -import com.azure.storage.file.datalake.DataLakeDirectoryClient; -import com.azure.storage.file.datalake.DataLakeFileClient; -import com.azure.storage.file.datalake.DataLakeFileSystemClient; -import com.azure.storage.file.datalake.DataLakeServiceClient; - import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @@ -43,7 +43,6 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); - if (flowFile == null) { return; } @@ -53,11 +52,21 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); - final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); - final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + if (StringUtils.isBlank(fileSystem)) { + throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " + + FILESYSTEM.getDisplayName() + " must be specified as a non-empty string."); + } + if (StringUtils.isBlank(fileName)) { + throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " + + FILE.getDisplayName() + " must be specified as a non-empty string."); + } + + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); + fileClient.delete(); session.transfer(flowFile, REL_SUCCESS); @@ -69,4 +78,4 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc session.transfer(flowFile, REL_FAILURE); } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 2d80ea34ff..5af7988685 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -63,8 +63,8 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce } final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); - final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); - final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); if (fileClient.getProperties().isDirectory()) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java index 29915eb9ef..a07108b7a9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java @@ -16,44 +16,388 @@ */ package org.apache.nifi.processors.azure.storage; +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -public class ITDeleteAzureDataLakeStorage extends AbstractAzureBlobStorageIT { +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Override protected Class getProcessorClass() { return DeleteAzureDataLakeStorage.class; } - @Before - public void setUp() { - runner.setProperty(DeleteAzureDataLakeStorage.FILE, TEST_FILE_NAME); - } - - @Ignore @Test - public void testDeleteFile() throws Exception { + public void testDeleteFileFromRoot() { + // GIVEN + String directory= ""; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + uploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteFileFromDirectory() { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteFileFromDeepDirectory() { + // GIVEN + String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/" + + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/" + + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteFileWithWhitespaceInFilename() { + // GIVEN + String directory= "TestDirectory"; + String filename = "A test file.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteFileWithWhitespaceInDirectoryName() { + // GIVEN + String directory= "A Test Directory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteEmptyDirectory() { + // GIVEN + String parentDirectory = "ParentDirectory"; + String childDirectory = "ChildDirectory"; + String inputFlowFileContent = "InputFlowFileContent"; + + fileSystemClient.createDirectory(parentDirectory + "/" + childDirectory); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent); + } + + @Test + public void testDeleteFileCaseSensitiveFilename() { + // GIVEN + String directory = "TestDirectory"; + String filename1 = "testFile.txt"; + String filename2 = "testfile.txt"; + String fileContent1 = "ContentOfFile1"; + String fileContent2 = "ContentOfFile2"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename1, fileContent1); + uploadFile(directory, filename2, fileContent2); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, filename1, inputFlowFileContent, inputFlowFileContent); + assertTrue(fileExists(directory, filename2)); + } + + @Test + public void testDeleteUsingExpressionLanguage() { + // GIVEN + String expLangFileSystem = "az.filesystem"; + String expLangDirectory = "az.directory"; + String expLangFilename = "az.filename"; + + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + + String inputFlowFileContent = "InputFlowFileContent"; + + Map attributes = new HashMap<>(); + attributes.put(expLangFileSystem, fileSystemName); + attributes.put(expLangDirectory, directory); + attributes.put(expLangFilename, filename); + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testSuccessfulDeleteWithExpressionLanguage("${" + expLangFileSystem + "}", + "${" + expLangDirectory + "}", + "${" + expLangFilename + "}", + attributes, + inputFlowFileContent, + inputFlowFileContent, + directory, + filename); + } + + @Test + public void testDeleteUsingExpressionLanguageFileSystemIsNotSpecified() { + // GIVEN + String expLangFileSystem = "az.filesystem"; + String expLangDirectory = "az.directory"; + String expLangFilename = "az.filename"; + + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + + String inputFlowFileContent = "InputFlowFileContent"; + + Map attributes = new HashMap<>(); + attributes.put(expLangDirectory, directory); + attributes.put(expLangFilename, filename); + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testFailedDeleteWithProcessException("${" + expLangFileSystem + "}", + "${" + expLangDirectory + "}", + "${" + expLangFilename + "}", + attributes, + inputFlowFileContent, + inputFlowFileContent); + assertTrue(fileExists(directory, filename)); + } + + @Test + public void testDeleteUsingExpressionLanguageFilenameIsNotSpecified() { + // GIVEN + String expLangFileSystem = "az.filesystem"; + String expLangDirectory = "az.directory"; + String expLangFilename = "az.filename"; + + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + + String inputFlowFileContent = "InputFlowFileContent"; + Map attributes = new HashMap<>(); + attributes.put(expLangFileSystem, fileSystemName); + attributes.put(expLangDirectory, directory); + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testFailedDeleteWithProcessException("${" + expLangFileSystem + "}", + "${" + expLangDirectory + "}", + "${" + expLangFilename + "}", + attributes, + inputFlowFileContent, + inputFlowFileContent); + assertTrue(fileExists(directory, filename)); + } + + @Test + public void testDeleteNonExistentFile() { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + fileSystemClient.createDirectory(directory); + + // WHEN + // THEN + testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404); + assertTrue(fileExists("", directory)); + } + + @Test + public void testDeleteFileFromNonExistentDirectory() { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + // WHEN + // THEN + testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404); + } + + @Test + public void testDeleteFileFromNonExistentFileSystem() { + // GIVEN + String fileSystem = "NonExistentFileSystem"; + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + // WHEN + // THEN + testFailedDelete(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400); + } + + @Test + public void testDeleteNonEmptyDirectory() { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + testFailedDelete(fileSystemName, "", directory, inputFlowFileContent, inputFlowFileContent, 409); + assertTrue(fileExists(directory, filename)); + } + + @Test + public void testDeleteFileWithInvalidFilename() { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String invalidFilename = "test/\\File.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + // WHEN + // THEN + testFailedDelete(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 400); + assertTrue(fileExists(directory, filename)); + } + + private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) { + testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent, + directory, filename); + } + + private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map attributes, + String inputFlowFileContent, String expectedFlowFileContent, String directory, String filename) { + // GIVEN + int expectedNumberOfProvenanceEvents = 1; + ProvenanceEventType expectedEventType = ProvenanceEventType.REMOTE_INVOCATION; + + setRunnerProperties(expLangFileSystem, expLangDirectory, expLangFilename); + + // WHEN + startRunner(inputFlowFileContent, attributes); + + // THEN + assertSuccess(directory, filename, expectedFlowFileContent, expectedNumberOfProvenanceEvents, expectedEventType); + } + + private void testFailedDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) { + // GIVEN + setRunnerProperties(fileSystem, directory, filename); + + // WHEN + startRunner(inputFlowFileContent, Collections.emptyMap()); + + // THEN + DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable(); + assertEquals(expectedErrorCode, e.getStatusCode()); + + assertFailure(expectedFlowFileContent); + } + + private void testFailedDeleteWithProcessException(String fileSystem, String directory, String filename, Map attributes, + String inputFlowFileContent, String expectedFlowFileContent) { + // GIVEN + setRunnerProperties(fileSystem, directory, filename); + + // WHEN + startRunner(inputFlowFileContent, attributes); + + // THEN + Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable(); + assertEquals(ProcessException.class, exception.getClass()); + + assertFailure(expectedFlowFileContent); + } + + private boolean fileExists(String directory, String filename) { + DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); + DataLakeFileClient fileClient = directoryClient.getFileClient(filename); + + return fileClient.exists(); + } + + private void setRunnerProperties(String fileSystem, String directory, String filename) { + runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem); + runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory); + runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename); runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(1); - - assertResult(); } + private void startRunner(String inputFlowFileContent, Map attributes) { + runner.enqueue(inputFlowFileContent, attributes); + runner.run(); + } - private void assertResult() throws Exception { + private void assertSuccess(String directory, String filename, String expectedFlowFileContent, int expectedNumberOfProvenanceEvents, ProvenanceEventType expectedEventType) { runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_SUCCESS, 1); - List flowFilesForRelationship = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals("0123456789".getBytes()); - flowFile.assertAttributeEquals("azure.length", "10"); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS).get(0); + flowFile.assertContentEquals(expectedFlowFileContent); - } + int actualNumberOfProvenanceEvents = runner.getProvenanceEvents().size(); + assertEquals(expectedNumberOfProvenanceEvents, actualNumberOfProvenanceEvents); + + ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType(); + assertEquals(expectedEventType, actualEventType); + + assertFalse(fileExists(directory, filename)); } + + private void assertFailure(String expectedFlowFileContent) { + runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_FAILURE, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_FAILURE).get(0); + flowFile.assertContentEquals(expectedFlowFileContent); + } + } \ No newline at end of file