NIFI-9657 Create MoveADLS processor

This closes #5752.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Timea Barna 2022-02-08 10:00:40 +01:00 committed by Peter Turcsanyi
parent ce0122bd25
commit a3e1f32cae
6 changed files with 640 additions and 17 deletions

View File

@ -175,19 +175,27 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
return evaluateFileSystemProperty(context, flowFile, FILESYSTEM);
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName()));
}
return fileSystem;
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
return evaluateDirectoryProperty(context, flowFile, DIRECTORY);
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (directory.startsWith("/")) {
throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName()));
} else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName()));
}
return directory;
}
@ -200,19 +208,30 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return fileName;
}
private static class DirectoryValidator implements Validator {
@Override
public static class DirectoryValidator implements Validator {
private String displayName;
public DirectoryValidator() {
this.displayName = null;
}
public DirectoryValidator(String displayName) {
this.displayName = displayName;
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName;
ValidationResult.Builder builder = new ValidationResult.Builder()
.subject(DIRECTORY.getDisplayName())
.subject(displayName)
.input(input);
if (context.isExpressionLanguagePresent(input)) {
builder.valid(true).explanation("Expression Language Present");
} else if (input.startsWith("/")) {
builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName));
} else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName));
} else {
builder.valid(true);
}

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Moves content within an Azure Data Lake Storage Gen 2." +
" After the move, files will be no longer available on source location.")
@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_SOURCE_FILESYSTEM, description = ATTR_DESCRIPTION_SOURCE_FILESYSTEM),
@WritesAttribute(attribute = ATTR_NAME_SOURCE_DIRECTORY, description = ATTR_DESCRIPTION_SOURCE_DIRECTORY),
@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
@WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
@WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
public static final String FAIL_RESOLUTION = "fail";
public static final String REPLACE_RESOLUTION = "replace";
public static final String IGNORE_RESOLUTION = "ignore";
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("conflict-resolution-strategy")
.displayName("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
.required(true)
.defaultValue(FAIL_RESOLUTION)
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
.build();
public static final PropertyDescriptor SOURCE_FILESYSTEM = new PropertyDescriptor.Builder()
.name("source-filesystem-name")
.displayName("Source Filesystem")
.description("Name of the Azure Storage File System from where the move should happen.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM))
.build();
public static final PropertyDescriptor SOURCE_DIRECTORY = new PropertyDescriptor.Builder()
.name("source-directory-name")
.displayName("Source Directory")
.description("Name of the Azure Storage Directory from where the move should happen. The Directory Name cannot contain a leading '/'." +
" The root directory can be designated by the empty string value.")
.addValidator(new DirectoryValidator("Source Directory"))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY))
.build();
public static final PropertyDescriptor DESTINATION_FILESYSTEM = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(FILESYSTEM)
.displayName("Destination Filesystem")
.description("Name of the Azure Storage File System where the files will be moved.")
.build();
public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DIRECTORY)
.displayName("Destination Directory")
.description("Name of the Azure Storage Directory where the files will be moved. The Directory Name cannot contain a leading '/'." +
" The root directory can be designated by the empty string value. Non-existing directories will be created." +
" If the original directory structure should be kept, the full directory path needs to be provided after the destination directory." +
" e.g.: destdir/${azure.directory}")
.addValidator(new DirectoryValidator("Destination Directory"))
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
SOURCE_FILESYSTEM,
SOURCE_DIRECTORY,
DESTINATION_FILESYSTEM,
DESTINATION_DIRECTORY,
FILE,
CONFLICT_RESOLUTION
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
try {
final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM);
final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY);
final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM);
final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY);
final String fileName = evaluateFileNameProperty(context, flowFile);
final String destinationPath;
if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) {
destinationPath = destinationDirectory + "/";
} else {
destinationPath = destinationDirectory;
}
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient sourceFileSystemClient = storageClient.getFileSystemClient(sourceFileSystem);
final DataLakeDirectoryClient sourceDirectoryClient = sourceFileSystemClient.getDirectoryClient(sourceDirectory);
final DataLakeFileSystemClient destinationFileSystemClient = storageClient.getFileSystemClient(destinationFileSystem);
final DataLakeDirectoryClient destinationDirectoryClient = destinationFileSystemClient.getDirectoryClient(destinationDirectory);
DataLakeFileClient sourceFileClient = sourceDirectoryClient.getFileClient(fileName);
final DataLakeRequestConditions sourceConditions = new DataLakeRequestConditions();
final DataLakeRequestConditions destinationConditions = new DataLakeRequestConditions();
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
try {
if (!destinationDirectory.isEmpty() && !destinationDirectoryClient.exists()) {
destinationDirectoryClient.create();
}
if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
destinationConditions.setIfNoneMatch("*");
}
final DataLakeFileClient destinationFileClient = sourceFileClient.renameWithResponse(destinationFileSystem,
destinationPath + fileName,
sourceConditions,
destinationConditions,
null,
null)
.getValue();
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_SOURCE_FILESYSTEM, sourceFileSystem);
attributes.put(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
attributes.put(ATTR_NAME_FILESYSTEM, destinationFileSystem);
attributes.put(ATTR_NAME_DIRECTORY, destinationDirectory);
attributes.put(ATTR_NAME_FILENAME, fileName);
attributes.put(ATTR_NAME_PRIMARY_URI, destinationFileClient.getFileUrl());
attributes.put(ATTR_NAME_LENGTH, String.valueOf(destinationFileClient.getProperties().getFileSize()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, sourceFileClient.getFileUrl(), transferMillis);
} catch (DataLakeStorageException dlsException) {
if (dlsException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
session.transfer(flowFile, REL_SUCCESS);
String warningMessage = String.format("File with the same name already exists. " +
"Remote file not modified. " +
"Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
getLogger().warn(warningMessage, flowFile);
} else {
throw dlsException;
}
}
} catch (Exception e) {
getLogger().error("Failed to move file on Azure Data Lake Storage", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -42,4 +42,10 @@ public final class ADLSAttributes {
public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
public static final String ATTR_NAME_SOURCE_FILESYSTEM = "azure.source.filesystem";
public static final String ATTR_DESCRIPTION_SOURCE_FILESYSTEM = "The name of the source Azure File System";
public static final String ATTR_NAME_SOURCE_DIRECTORY = "azure.source.directory";
public static final String ATTR_DESCRIPTION_SOURCE_DIRECTORY = "The name of the source Azure Directory";
}

View File

@ -30,3 +30,4 @@ org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12
org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage

View File

@ -87,22 +87,28 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
}
protected void uploadFile(String directory, String filename, String fileContent) {
byte[] fileContentBytes = fileContent.getBytes();
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
DataLakeFileClient fileClient = directoryClient.createFile(filename);
PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length);
uploadFile(directory, filename, fileContent.getBytes());
}
protected void uploadFile(TestFile testFile) {
uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
}
protected void uploadFile(String directory, String filename, byte[] fileData) {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
DataLakeFileClient fileClient = directoryClient.createFile(filename);
PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileData), fileData.length);
}
protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
createDirectoryAndUploadFile(directory, filename, fileContent.getBytes());
}
protected void createDirectoryAndUploadFile(String directory, String filename, byte[] fileData) {
createDirectory(directory);
uploadFile(directory, filename, fileContent);
uploadFile(directory, filename, fileData);
}
protected void createDirectoryAndUploadFile(TestFile testFile) {

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.google.common.collect.Sets;
import com.google.common.net.UrlEscapers;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private static final String SOURCE_DIRECTORY = "sourceDir1";
private static final String DESTINATION_DIRECTORY = "destDir1";
private static final String FILE_NAME = "file1";
private static final byte[] FILE_DATA = "0123456789".getBytes();
private static final String EL_FILESYSTEM = "az.filesystem";
private static final String EL_DIRECTORY = "az.directory";
private static final String EL_FILE_NAME = "az.filename";
@Override
protected Class<? extends Processor> getProcessorClass() {
return MoveAzureDataLakeStorage.class;
}
@Before
public void setUp() throws InterruptedException {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, fileSystemName);
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME);
}
@Test
public void testMoveFileToExistingDirectory() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
createDirectory(DESTINATION_DIRECTORY);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
createDirectory(DESTINATION_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToExistingDirectoryWithIgnoreResolution() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
createDirectory(DESTINATION_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToNonExistingDirectory() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToDeepDirectory() throws Exception {
String sourceDirectory = "dir1/dir2";
String destinationDirectory = sourceDirectory + "/dir3/dir4";
createDirectoryAndUploadFile(sourceDirectory, FILE_NAME, FILE_DATA);
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
runProcessor(FILE_DATA);
assertSuccess(sourceDirectory, destinationDirectory, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToRootDirectory() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
String rootDirectory = "";
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, rootDirectory);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, rootDirectory, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveEmptyFile() throws Exception {
byte[] fileData = new byte[0];
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
runProcessor(fileData);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
}
@Test
public void testMoveBigFile() throws Exception {
Random random = new Random();
byte[] fileData = new byte[120_000_000];
random.nextBytes(fileData);
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData);
runProcessor(fileData);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData);
}
@Test
public void testMoveFileWithNonExistingFileSystem() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, "dummy");
runProcessor(FILE_DATA);
assertFailure();
}
@Test
public void testMoveFileWithInvalidFileName() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1");
runProcessor(FILE_DATA);
assertFailure();
}
@Test
public void testMoveFileWithSpacesInDirectoryAndFileName() throws Exception {
String sourceDirectory = "dir 1";
String destinationDirectory = "dest dir1";
String fileName = "file 1";
createDirectoryAndUploadFile(sourceDirectory, fileName, FILE_DATA);
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName);
runProcessor(FILE_DATA);
assertSuccess(sourceDirectory, destinationDirectory, fileName, FILE_DATA);
}
@Test
public void testMoveFileToExistingFileWithFailResolution() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.FAIL_RESOLUTION);
runProcessor(FILE_DATA);
assertFailure();
}
@Test
public void testMoveFileToExistingFileWithReplaceResolution() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME));
runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION);
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToExistingFileWithIgnoreResolution() throws Exception {
String fileContent = "destination";
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
createDirectoryAndUploadFile(DESTINATION_DIRECTORY, FILE_NAME, fileContent);
runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION);
runProcessor(FILE_DATA);
assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes());
}
@Test
public void testMoveFileWithEL() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
Map<String, String> attributes = createAttributesMap(FILE_DATA);
setELProperties();
runProcessor(FILE_DATA, attributes);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileWithELButFilesystemIsNotSpecified() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
Map<String, String> attributes = createAttributesMap(FILE_DATA);
attributes.remove(EL_FILESYSTEM);
setELProperties();
runProcessor(FILE_DATA, attributes);
assertFailure();
}
@Test
public void testMoveFileWithELButFileNameIsNotSpecified() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
Map<String, String> attributes = createAttributesMap(FILE_DATA);
attributes.remove(EL_FILE_NAME);
setELProperties();
runProcessor(FILE_DATA, attributes);
assertFailure();
}
private Map<String, String> createAttributesMap(byte[] fileData) {
Map<String, String> attributes = new HashMap<>();
attributes.put(EL_FILESYSTEM, fileSystemName);
attributes.put(EL_DIRECTORY, SOURCE_DIRECTORY);
attributes.put(EL_FILE_NAME, FILE_NAME);
attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileData.length));
return attributes;
}
private void setELProperties() {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY));
runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME));
}
private void runProcessor(byte[] fileData) {
runProcessor(fileData, Collections.singletonMap(ATTR_NAME_LENGTH, String.valueOf(fileData.length)));
}
private void runProcessor(byte[] testData, Map<String, String> attributes) {
runner.assertValid();
runner.enqueue(testData, attributes);
runner.run();
}
private void assertSuccess(String sourceDirectory, String destinationDirectory, String fileName, byte[] fileData) throws Exception {
assertFlowFile(fileData, fileName, sourceDirectory, destinationDirectory);
assertAzureFile(destinationDirectory, fileName, fileData);
assertProvenanceEvents();
}
private void assertSuccessWithIgnoreResolution(String destinationDirectory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception {
assertFlowFile(fileData);
assertAzureFile(destinationDirectory, fileName, azureFileData);
}
private void assertFlowFile(byte[] fileData, String fileName, String sourceDirectory, String destinationDirectory) throws Exception {
MockFlowFile flowFile = assertFlowFile(fileData);
flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory);
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, destinationDirectory);
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(destinationDirectory);
String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/");
String primaryUri = StringUtils.isNotEmpty(destinationDirectory)
? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName)
: String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
}
private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(MoveAzureDataLakeStorage.REL_SUCCESS).get(0);
flowFile.assertContentEquals(fileData);
return flowFile;
}
private void assertAzureFile(String destinationDirectory, String fileName, byte[] fileData) {
DataLakeFileClient fileClient;
if (StringUtils.isNotEmpty(destinationDirectory)) {
DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(destinationDirectory);
assertTrue(directoryClient.exists());
fileClient = directoryClient.getFileClient(fileName);
} else {
fileClient = fileSystemClient.getFileClient(fileName);
}
assertTrue(fileClient.exists());
assertEquals(fileData.length, fileClient.getProperties().getFileSize());
}
private void assertProvenanceEvents() {
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
.map(ProvenanceEventRecord::getEventType)
.collect(Collectors.toSet());
assertEquals(expectedEventTypes, actualEventTypes);
}
private void assertFailure() {
runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_FAILURE, 1);
}
}