From 9aae58f1178cacb6ee4814f26288bf8ca5150d71 Mon Sep 17 00:00:00 2001 From: Peter Gyori Date: Thu, 14 May 2020 14:35:34 +0200 Subject: [PATCH] NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory A newer version (12.1.1) of azure-storage-file-datalake is imported. This closes #4273. Signed-off-by: Peter Turcsanyi --- .../nifi-azure-bundle/nifi-azure-processors/pom.xml | 2 +- .../azure/storage/FetchAzureDataLakeStorage.java | 4 ++++ .../azure/storage/ITFetchAzureDataLakeStorage.java | 10 ++++++---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index b0e80df5f6..0a3602db60 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -73,7 +73,7 @@ com.azure azure-storage-file-datalake - 12.0.1 + 12.1.1 diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index a616439642..2d80ea34ff 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -67,6 +67,10 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); + if (fileClient.getProperties().isDirectory()) { + throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath()); + } + flowFile = session.write(flowFile, os -> fileClient.read(os)); session.getProvenanceReporter().modifyContent(flowFile); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java index f08b75e7f3..de35508ab6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -112,7 +112,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String inputFlowFileContent = "InputFlowFileContent"; createDirectoryAndUploadFile(directory, filename1, fileContent1); - createDirectoryAndUploadFile(directory, filename2, fileContent2); + uploadFile(directory, filename2, fileContent2); // WHEN // THEN @@ -161,7 +161,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent); } - @Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.") @Test public void testFetchDirectory() { // GIVEN @@ -170,13 +169,12 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT String filename = "testFile.txt"; String fileContent = "AzureFileContent"; String inputFlowFileContent = "InputFlowFileContent"; - String expectedFlowFileContent = ""; createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent); // WHEN // THEN - testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent); + testFailedFetchWithProcessException(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent); } @Test @@ -391,6 +389,10 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT assertFailure(expectedFlowFileContent); } + private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) { + testFailedFetchWithProcessException(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent); + } + private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map attributes, String inputFlowFileContent, String expectedFlowFileContent) { // GIVEN