NIFI-7336: Add tests for DeleteAzureDataLakeStorage

DeleteAzureDataLakeStorage now throws exception if fileSystem or fileName is empty string

NIFI-7336: Add tests for DeleteAzureDataLakeStorage - typos fixed
NIFI-7336: Add tests for DeleteAzureDataLakeStorage - fixed a test case

This closes #4272.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Peter Gyori 2020-05-07 16:44:25 +02:00 committed by Peter Turcsanyi
parent 852715aadd
commit d6240a1074
3 changed files with 386 additions and 33 deletions

View File

@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -27,11 +32,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@ -43,7 +43,6 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@ -53,11 +52,21 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
}
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
FILE.getDisplayName() + " must be specified as a non-empty string.");
}
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
fileClient.delete();
session.transfer(flowFile, REL_SUCCESS);
@ -69,4 +78,4 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
session.transfer(flowFile, REL_FAILURE);
}
}
}
}

View File

@ -63,8 +63,8 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
}
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
if (fileClient.getProperties().isDirectory()) {

View File

@ -16,44 +16,388 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ITDeleteAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return DeleteAzureDataLakeStorage.class;
}
@Before
public void setUp() {
runner.setProperty(DeleteAzureDataLakeStorage.FILE, TEST_FILE_NAME);
}
@Ignore
@Test
public void testDeleteFile() throws Exception {
public void testDeleteFileFromRoot() {
// GIVEN
String directory= "";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
uploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteFileFromDirectory() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteFileFromDeepDirectory() {
// GIVEN
String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+ "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+ "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteFileWithWhitespaceInFilename() {
// GIVEN
String directory= "TestDirectory";
String filename = "A test file.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteFileWithWhitespaceInDirectoryName() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteEmptyDirectory() {
// GIVEN
String parentDirectory = "ParentDirectory";
String childDirectory = "ChildDirectory";
String inputFlowFileContent = "InputFlowFileContent";
fileSystemClient.createDirectory(parentDirectory + "/" + childDirectory);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteFileCaseSensitiveFilename() {
// GIVEN
String directory = "TestDirectory";
String filename1 = "testFile.txt";
String filename2 = "testfile.txt";
String fileContent1 = "ContentOfFile1";
String fileContent2 = "ContentOfFile2";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename1, fileContent1);
uploadFile(directory, filename2, fileContent2);
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, filename1, inputFlowFileContent, inputFlowFileContent);
assertTrue(fileExists(directory, filename2));
}
@Test
public void testDeleteUsingExpressionLanguage() {
// GIVEN
String expLangFileSystem = "az.filesystem";
String expLangDirectory = "az.directory";
String expLangFilename = "az.filename";
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
Map<String, String> attributes = new HashMap<>();
attributes.put(expLangFileSystem, fileSystemName);
attributes.put(expLangDirectory, directory);
attributes.put(expLangFilename, filename);
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testSuccessfulDeleteWithExpressionLanguage("${" + expLangFileSystem + "}",
"${" + expLangDirectory + "}",
"${" + expLangFilename + "}",
attributes,
inputFlowFileContent,
inputFlowFileContent,
directory,
filename);
}
@Test
public void testDeleteUsingExpressionLanguageFileSystemIsNotSpecified() {
// GIVEN
String expLangFileSystem = "az.filesystem";
String expLangDirectory = "az.directory";
String expLangFilename = "az.filename";
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
Map<String, String> attributes = new HashMap<>();
attributes.put(expLangDirectory, directory);
attributes.put(expLangFilename, filename);
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
"${" + expLangDirectory + "}",
"${" + expLangFilename + "}",
attributes,
inputFlowFileContent,
inputFlowFileContent);
assertTrue(fileExists(directory, filename));
}
@Test
public void testDeleteUsingExpressionLanguageFilenameIsNotSpecified() {
// GIVEN
String expLangFileSystem = "az.filesystem";
String expLangDirectory = "az.directory";
String expLangFilename = "az.filename";
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
Map<String, String> attributes = new HashMap<>();
attributes.put(expLangFileSystem, fileSystemName);
attributes.put(expLangDirectory, directory);
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testFailedDeleteWithProcessException("${" + expLangFileSystem + "}",
"${" + expLangDirectory + "}",
"${" + expLangFilename + "}",
attributes,
inputFlowFileContent,
inputFlowFileContent);
assertTrue(fileExists(directory, filename));
}
@Test
public void testDeleteNonExistentFile() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
fileSystemClient.createDirectory(directory);
// WHEN
// THEN
testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
assertTrue(fileExists("", directory));
}
@Test
public void testDeleteFileFromNonExistentDirectory() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
// WHEN
// THEN
testFailedDelete(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
}
@Test
public void testDeleteFileFromNonExistentFileSystem() {
// GIVEN
String fileSystem = "NonExistentFileSystem";
String directory = "TestDirectory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
// WHEN
// THEN
testFailedDelete(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400);
}
@Test
public void testDeleteNonEmptyDirectory() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
testFailedDelete(fileSystemName, "", directory, inputFlowFileContent, inputFlowFileContent, 409);
assertTrue(fileExists(directory, filename));
}
@Test
public void testDeleteFileWithInvalidFilename() {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String invalidFilename = "test/\\File.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
// WHEN
// THEN
testFailedDelete(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 400);
assertTrue(fileExists(directory, filename));
}
private void testSuccessfulDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
testSuccessfulDeleteWithExpressionLanguage(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent,
directory, filename);
}
private void testSuccessfulDeleteWithExpressionLanguage(String expLangFileSystem, String expLangDirectory, String expLangFilename, Map<String, String> attributes,
String inputFlowFileContent, String expectedFlowFileContent, String directory, String filename) {
// GIVEN
int expectedNumberOfProvenanceEvents = 1;
ProvenanceEventType expectedEventType = ProvenanceEventType.REMOTE_INVOCATION;
setRunnerProperties(expLangFileSystem, expLangDirectory, expLangFilename);
// WHEN
startRunner(inputFlowFileContent, attributes);
// THEN
assertSuccess(directory, filename, expectedFlowFileContent, expectedNumberOfProvenanceEvents, expectedEventType);
}
private void testFailedDelete(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
// GIVEN
setRunnerProperties(fileSystem, directory, filename);
// WHEN
startRunner(inputFlowFileContent, Collections.emptyMap());
// THEN
DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
assertEquals(expectedErrorCode, e.getStatusCode());
assertFailure(expectedFlowFileContent);
}
private void testFailedDeleteWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
String inputFlowFileContent, String expectedFlowFileContent) {
// GIVEN
setRunnerProperties(fileSystem, directory, filename);
// WHEN
startRunner(inputFlowFileContent, attributes);
// THEN
Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable();
assertEquals(ProcessException.class, exception.getClass());
assertFailure(expectedFlowFileContent);
}
private boolean fileExists(String directory, String filename) {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
DataLakeFileClient fileClient = directoryClient.getFileClient(filename);
return fileClient.exists();
}
private void setRunnerProperties(String fileSystem, String directory, String filename) {
runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run(1);
assertResult();
}
private void startRunner(String inputFlowFileContent, Map<String, String> attributes) {
runner.enqueue(inputFlowFileContent, attributes);
runner.run();
}
private void assertResult() throws Exception {
private void assertSuccess(String directory, String filename, String expectedFlowFileContent, int expectedNumberOfProvenanceEvents, ProvenanceEventType expectedEventType) {
runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS).get(0);
flowFile.assertContentEquals(expectedFlowFileContent);
}
int actualNumberOfProvenanceEvents = runner.getProvenanceEvents().size();
assertEquals(expectedNumberOfProvenanceEvents, actualNumberOfProvenanceEvents);
ProvenanceEventType actualEventType = runner.getProvenanceEvents().get(0).getEventType();
assertEquals(expectedEventType, actualEventType);
assertFalse(fileExists(directory, filename));
}
private void assertFailure(String expectedFlowFileContent) {
runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_FAILURE).get(0);
flowFile.assertContentEquals(expectedFlowFileContent);
}
}