NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory

A newer version (12.1.1) of azure-storage-file-datalake is imported.

This closes #4273.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Peter Gyori 2020-05-14 14:35:34 +02:00 committed by Peter Turcsanyi
parent 179675f0b4
commit 9aae58f117
3 changed files with 11 additions and 5 deletions

View File

@ -73,7 +73,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.0.1</version>
<version>12.1.1</version>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>

View File

@ -67,6 +67,10 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
if (fileClient.getProperties().isDirectory()) {
throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
}
flowFile = session.write(flowFile, os -> fileClient.read(os));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -112,7 +112,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename1, fileContent1);
createDirectoryAndUploadFile(directory, filename2, fileContent2);
uploadFile(directory, filename2, fileContent2);
// WHEN
// THEN
@ -161,7 +161,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
}
@Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.")
@Test
public void testFetchDirectory() {
// GIVEN
@ -170,13 +169,12 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
String expectedFlowFileContent = "";
createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent);
testFailedFetchWithProcessException(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
}
@Test
@ -391,6 +389,10 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
assertFailure(expectedFlowFileContent);
}
private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
testFailedFetchWithProcessException(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
}
private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
String inputFlowFileContent, String expectedFlowFileContent) {
// GIVEN