NIFI-8248 Modified PutAzureDataLakeStorage processor to use temp file instead of inline replacement

This closes #6159

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Timea Barna 2022-06-27 07:50:29 +02:00 committed by Joey Frazee
parent 1dd2bbb866
commit 22e8901fce
6 changed files with 365 additions and 43 deletions

View File

@ -101,6 +101,8 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
REL_FAILURE
)));
public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;

View File

@ -63,6 +63,7 @@ import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_T
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
@ -129,6 +130,15 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor INCLUDE_TEMPORARY_FILES = new PropertyDescriptor.Builder()
.name("include-temporary-files")
.displayName("Include Temporary Files")
.description("Whether to include temporary files when listing the contents of configured directory paths.")
.required(true)
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.defaultValue(Boolean.FALSE.toString())
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
@ -136,6 +146,7 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
RECURSE_SUBDIRECTORIES,
FILE_FILTER,
PATH_FILTER,
INCLUDE_TEMPORARY_FILES,
RECORD_WRITER,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
@ -261,10 +272,12 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
options.setRecursive(recurseSubdirectories);
final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
final boolean includeTempFiles = context.getProperty(INCLUDE_TEMPORARY_FILES).asBoolean();
final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
.filter(pathItem -> includeTempFiles || !pathItem.getName().contains(TEMP_FILE_DIRECTORY))
.filter(pathItem -> isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, pathItem.getLastModified().toInstant().toEpochMilli(), pathItem.getContentLength()))
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)

View File

@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -30,20 +31,24 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.StringUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
@ -83,11 +88,23 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
.build();
public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder()
.name("base-temporary-path")
.displayName("Base Temporary Path")
.description("The Path where the temporary directory will be created. The Path name cannot contain a leading '/'." +
" The root directory can be designated by the empty string value. Non-existing directories will be created." +
"The Temporary File Directory name is " + TEMP_FILE_DIRECTORY)
.defaultValue("")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new DirectoryValidator("Base Temporary Path"))
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE,
BASE_TEMPORARY_PATH,
CONFLICT_RESOLUTION,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@ -107,41 +124,39 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String originalDirectory = evaluateDirectoryProperty(context, flowFile);
final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH);
final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY);
final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient;
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory);
final DataLakeFileClient tempFileClient;
final DataLakeFileClient renamedFileClient;
final String tempFilePrefix = UUID.randomUUID().toString();
final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory);
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
try {
fileClient = directoryClient.createFile(fileName, overwrite);
final long length = flowFile.getSize();
if (length > 0) {
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
uploadContent(fileClient, bufferedIn, length);
} catch (Exception e) {
removeTempFile(fileClient);
throw e;
}
}
tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
appendContent(flowFile, tempFileClient, session);
createDirectoryIfNotExists(directoryClient);
renamedFileClient = renameFile(fileName, directoryClient.getDirectoryPath(), tempFileClient, overwrite);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
attributes.put(ATTR_NAME_DIRECTORY, directory);
attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
attributes.put(ATTR_NAME_FILENAME, fileName);
attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
attributes.put(ATTR_NAME_PRIMARY_URI, renamedFileClient.getFileUrl());
attributes.put(ATTR_NAME_LENGTH, String.valueOf(flowFile.getSize()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
session.getProvenanceReporter().send(flowFile, renamedFileClient.getFileUrl(), transferMillis);
} catch (DataLakeStorageException dlsException) {
if (dlsException.getStatusCode() == 409) {
if (conflictResolution.equals(IGNORE_RESOLUTION)) {
@ -164,14 +179,26 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
}
}
private void removeTempFile(DataLakeFileClient fileClient) {
try {
fileClient.delete();
} catch (Exception e) {
getLogger().error("Error while removing temp file on Azure Data Lake Storage", e);
private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) {
if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) {
directoryClient.create();
}
}
//Visible for testing
void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, ProcessSession session) throws IOException {
final long length = flowFile.getSize();
if (length > 0) {
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
uploadContent(fileClient, bufferedIn, length);
} catch (Exception e) {
removeTempFile(fileClient);
throw e;
}
}
}
//Visible for testing
static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
long chunkStart = 0;
long chunkSize;
@ -190,4 +217,34 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
fileClient.flush(length);
}
//Visible for testing
DataLakeFileClient renameFile(final String fileName, final String directoryPath, final DataLakeFileClient fileClient, final boolean overwrite) {
try {
final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions();
if (!overwrite) {
destinationCondition.setIfNoneMatch("*");
}
final String destinationPath = createPath(directoryPath, fileName);
return fileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue();
} catch (DataLakeStorageException dataLakeStorageException) {
getLogger().error("Renaming File [{}] failed", fileClient.getFileName(), dataLakeStorageException);
removeTempFile(fileClient);
throw dataLakeStorageException;
}
}
private String createPath(final String baseDirectory, final String path) {
return StringUtils.isNotBlank(baseDirectory)
? baseDirectory + "/" + path
: path;
}
private void removeTempFile(final DataLakeFileClient fileClient) {
try {
fileClient.delete();
} catch (Exception e) {
getLogger().error("Renaming File [{}] failed", fileClient.getFileName(), e);
}
}
}

View File

@ -28,27 +28,29 @@
<h3>File uploading and cleanup process</h3>
<h4>New file</h4>
<h4>New file upload</h4>
<ol>
<li>An empty file is created.</li>
<li>Content is appended to file.</li>
<li>In case append failure the file is deleted.</li>
<li>In case file deletion failure the empty file remains on the server.</li>
<li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li>
<li>Content is appended to temp file.</li>
<li>Temp file is renamed to its original name, the original file is overwritten.</li>
<li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li>
<li>In case of temporary file deletion failure both temp file and original file remain on the server.</li>
</ol>
<h4>Existing file</h4>
<h4>Existing file upload</h4>
<ul>
<li>Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.</li>
<li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li>
<li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li>
<li>Processors with "replace" conflict resolution strategy:</li>
<ol>
<li>An empty file overwrites the existing file, the original file is lost.</li>
<li>Content is appended to file.</li>
<li>In case append failure the file is deleted.</li>
<li>In case file deletion failure the empty file remains on the server.</li>
<li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li>
<li>Content is appended to temp file.</li>
<li>Temp file is renamed to its original name, the original file is overwritten.</li>
<li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li>
<li>In case of temporary file deletion failure both temp file and original file remain on the server.</li>
</ol>
</ul>

View File

@ -37,6 +37,7 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -57,6 +58,10 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
uploadFile(testFile1);
testFiles.put(testFile1.getFilePath(), testFile1);
TestFile testTempFile1 = new TestFile(AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY, "1234file1");
uploadFile(testTempFile1);
testFiles.put(testTempFile1.getFilePath(), testTempFile1);
TestFile testFile2 = new TestFile("", "file2");
uploadFile(testFile2);
testFiles.put(testFile2.getFilePath(), testFile2);
@ -65,6 +70,10 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
createDirectoryAndUploadFile(testFile11);
testFiles.put(testFile11.getFilePath(), testFile11);
TestFile testTempFile11 = new TestFile(String.format("dir1/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "5678file11");
uploadFile(testTempFile11);
testFiles.put(testTempFile11.getFilePath(), testTempFile11);
TestFile testFile12 = new TestFile("dir1", "file12");
uploadFile(testFile12);
testFiles.put(testFile12.getFilePath(), testFile12);
@ -73,10 +82,18 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
createDirectoryAndUploadFile(testFile111);
testFiles.put(testFile111.getFilePath(), testFile111);
TestFile testTempFile111 = new TestFile(String.format("dir1/dir11/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "9010file111");
uploadFile(testTempFile111);
testFiles.put(testTempFile111.getFilePath(), testTempFile111);
TestFile testFile21 = new TestFile("dir 2", "file 21", "Test");
createDirectoryAndUploadFile(testFile21);
testFiles.put(testFile21.getFilePath(), testFile21);
TestFile testTempFile21 = new TestFile(String.format("dir2/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "1112file21", "Test");
uploadFile(testTempFile21);
testFiles.put(testTempFile21.getFilePath(), testTempFile21);
createDirectory("dir3");
}
@ -89,6 +106,20 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -109,6 +140,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2");
}
@Test
public void testListRootNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2");
}
@Test
public void testListSubdirectoryRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
@ -118,6 +160,18 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListSubdirectoryNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
@ -128,20 +182,45 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12");
}
@Test
public void testListSubdirectoryNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12");
}
@Test
public void testListWithFileFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithFileFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithFileFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setVariable("suffix", "1.*");
runProcessor();
@ -149,6 +228,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithFileFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setVariable("suffix", "1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -159,6 +253,19 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithPathFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -171,6 +278,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setVariable("prefix", "^dir");
runner.setVariable("suffix", "1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListSubdirectoryWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
@ -181,6 +303,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/dir11/file111");
}
@Test
public void testListSubdirectoryWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/dir11/file111", String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithFileAndPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -192,6 +325,19 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir1/file11", "dir1/dir11/file111");
}
@Test
public void testListRootWithFileAndPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/dir11/file111", String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListEmptyDirectory() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
@ -236,6 +382,24 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
flowFile.assertAttributeEquals("record.count", "3");
}
@Test
public void testListWithRecordsWithTempFiles() throws InitializationException {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runner.run();
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("record.count", "5");
}
@Test
public void testListWithMinAge() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -246,6 +410,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
@Test
public void testListWithMinAgeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
@Test
public void testListWithMaxAge() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -256,6 +431,21 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListWithMaxAgeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithMinSize() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -266,6 +456,20 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithMinSizeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithMaxSize() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -276,6 +480,17 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("dir 2/file 21");
}
@Test
public void testListWithMaxSizeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir 2/file 21", String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
private void runProcessor() {
runner.assertValid();
runner.run();
@ -288,6 +503,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
assertEquals(expectedFiles.size(), flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
String filePath = flowFile.getAttribute("azure.filePath");

View File

@ -16,12 +16,19 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.core.http.rest.Response;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -44,9 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@ -255,15 +265,37 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileButFailedToAppend() {
DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
InputStream stream = mock(InputStream.class);
doThrow(NullPointerException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong());
final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage();
final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
final ProcessSession session = mock(ProcessSession.class);
final FlowFile flowFile = mock(FlowFile.class);
assertThrows(NullPointerException.class, () -> {
PutAzureDataLakeStorage.uploadContent(fileClient, stream, FILE_DATA.length);
when(flowFile.getSize()).thenReturn(1L);
doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong());
verify(fileClient).delete();
});
assertThrows(IllegalArgumentException.class, () -> processor.appendContent(flowFile, fileClient, session));
verify(fileClient).delete();
}
@Test
public void testPutFileButFailedToRename() {
final PutAzureDataLakeStorage processor = new PutAzureDataLakeStorage();
final ProcessorInitializationContext initContext = mock(ProcessorInitializationContext.class);
final String componentId = "componentId";
final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
final Response<DataLakeFileClient> response = mock(Response.class);
//Mock logger
when(initContext.getIdentifier()).thenReturn(componentId);
MockComponentLog componentLog = new MockComponentLog(componentId, processor);
when(initContext.getLogger()).thenReturn(componentLog);
processor.initialize(initContext);
//Mock renameWithResponse Azure method
when(fileClient.renameWithResponse(isNull(), anyString(), isNull(), any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response);
when(response.getValue()).thenThrow(DataLakeStorageException.class);
when(fileClient.getFileName()).thenReturn(FILE_NAME);
assertThrows(DataLakeStorageException.class, () -> processor.renameFile(FILE_NAME, "", fileClient, false));
verify(fileClient).delete();
}
private Map<String, String> createAttributesMap() {