From a3e1f32cae964df9dee47600016db9d2baba5d34 Mon Sep 17 00:00:00 2001 From: Timea Barna Date: Tue, 8 Feb 2022 10:00:40 +0100 Subject: [PATCH] NIFI-9657 Create MoveADLS processor This closes #5752. Signed-off-by: Peter Turcsanyi --- ...AbstractAzureDataLakeStorageProcessor.java | 39 +- .../storage/MoveAzureDataLakeStorage.java | 221 +++++++++++ .../azure/storage/utils/ADLSAttributes.java | 6 + .../org.apache.nifi.processor.Processor | 1 + .../AbstractAzureDataLakeStorageIT.java | 20 +- .../storage/ITMoveAzureDataLakeStorage.java | 370 ++++++++++++++++++ 6 files changed, 640 insertions(+), 17 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index fbbcf2564b..d371e12af5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -175,19 +175,27 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc } public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) { - String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + return evaluateFileSystemProperty(context, flowFile, FILESYSTEM); + } + + public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) { + String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); if (StringUtils.isBlank(fileSystem)) { - throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName())); + throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName())); } return fileSystem; } public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) { - String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + return evaluateDirectoryProperty(context, flowFile, DIRECTORY); + } + + public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) { + String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); if (directory.startsWith("/")) { - throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName())); + throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName())); } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) { - throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName())); + throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName())); } return directory; } @@ -200,19 +208,30 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc return fileName; } - private static class DirectoryValidator implements Validator { - @Override + public static class DirectoryValidator implements Validator { + private String displayName; + + public DirectoryValidator() { + this.displayName = null; + } + + public DirectoryValidator(String displayName) { + this.displayName = displayName; + } + + @Override public ValidationResult validate(String subject, String input, ValidationContext context) { + displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName; ValidationResult.Builder builder = new ValidationResult.Builder() - .subject(DIRECTORY.getDisplayName()) + .subject(displayName) .input(input); if (context.isExpressionLanguagePresent(input)) { builder.valid(true).explanation("Expression Language Present"); } else if (input.startsWith("/")) { - builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName())); + builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName)); } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) { - builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName())); + builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName)); } else { builder.valid(true); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java new file mode 100644 index 0000000000..cb3d8b6669 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.DataLakeRequestConditions; +import com.azure.storage.file.datalake.models.DataLakeStorageException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_SOURCE_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM; + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) +@CapabilityDescription("Moves content within an Azure Data Lake Storage Gen 2." + + " After the move, files will be no longer available on source location.") +@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_SOURCE_FILESYSTEM, description = ATTR_DESCRIPTION_SOURCE_FILESYSTEM), + @WritesAttribute(attribute = ATTR_NAME_SOURCE_DIRECTORY, description = ATTR_DESCRIPTION_SOURCE_DIRECTORY), + @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM), + @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY), + @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME), + @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI), + @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + + public static final String FAIL_RESOLUTION = "fail"; + public static final String REPLACE_RESOLUTION = "replace"; + public static final String IGNORE_RESOLUTION = "ignore"; + + + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("conflict-resolution-strategy") + .displayName("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the output directory") + .required(true) + .defaultValue(FAIL_RESOLUTION) + .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) + .build(); + + public static final PropertyDescriptor SOURCE_FILESYSTEM = new PropertyDescriptor.Builder() + .name("source-filesystem-name") + .displayName("Source Filesystem") + .description("Name of the Azure Storage File System from where the move should happen.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM)) + .build(); + + public static final PropertyDescriptor SOURCE_DIRECTORY = new PropertyDescriptor.Builder() + .name("source-directory-name") + .displayName("Source Directory") + .description("Name of the Azure Storage Directory from where the move should happen. The Directory Name cannot contain a leading '/'." + + " The root directory can be designated by the empty string value.") + .addValidator(new DirectoryValidator("Source Directory")) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY)) + .build(); + + public static final PropertyDescriptor DESTINATION_FILESYSTEM = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(FILESYSTEM) + .displayName("Destination Filesystem") + .description("Name of the Azure Storage File System where the files will be moved.") + .build(); + + public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(DIRECTORY) + .displayName("Destination Directory") + .description("Name of the Azure Storage Directory where the files will be moved. The Directory Name cannot contain a leading '/'." + + " The root directory can be designated by the empty string value. Non-existing directories will be created." + + " If the original directory structure should be kept, the full directory path needs to be provided after the destination directory." + + " e.g.: destdir/${azure.directory}") + .addValidator(new DirectoryValidator("Destination Directory")) + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + ADLS_CREDENTIALS_SERVICE, + SOURCE_FILESYSTEM, + SOURCE_DIRECTORY, + DESTINATION_FILESYSTEM, + DESTINATION_DIRECTORY, + FILE, + CONFLICT_RESOLUTION + )); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + try { + final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM); + final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY); + final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM); + final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY); + final String fileName = evaluateFileNameProperty(context, flowFile); + + final String destinationPath; + if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) { + destinationPath = destinationDirectory + "/"; + } else { + destinationPath = destinationDirectory; + } + + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient sourceFileSystemClient = storageClient.getFileSystemClient(sourceFileSystem); + final DataLakeDirectoryClient sourceDirectoryClient = sourceFileSystemClient.getDirectoryClient(sourceDirectory); + final DataLakeFileSystemClient destinationFileSystemClient = storageClient.getFileSystemClient(destinationFileSystem); + final DataLakeDirectoryClient destinationDirectoryClient = destinationFileSystemClient.getDirectoryClient(destinationDirectory); + DataLakeFileClient sourceFileClient = sourceDirectoryClient.getFileClient(fileName); + final DataLakeRequestConditions sourceConditions = new DataLakeRequestConditions(); + final DataLakeRequestConditions destinationConditions = new DataLakeRequestConditions(); + final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + try { + if (!destinationDirectory.isEmpty() && !destinationDirectoryClient.exists()) { + destinationDirectoryClient.create(); + } + + if (!conflictResolution.equals(REPLACE_RESOLUTION)) { + destinationConditions.setIfNoneMatch("*"); + } + + final DataLakeFileClient destinationFileClient = sourceFileClient.renameWithResponse(destinationFileSystem, + destinationPath + fileName, + sourceConditions, + destinationConditions, + null, + null) + .getValue(); + + final Map attributes = new HashMap<>(); + attributes.put(ATTR_NAME_SOURCE_FILESYSTEM, sourceFileSystem); + attributes.put(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory); + attributes.put(ATTR_NAME_FILESYSTEM, destinationFileSystem); + attributes.put(ATTR_NAME_DIRECTORY, destinationDirectory); + attributes.put(ATTR_NAME_FILENAME, fileName); + attributes.put(ATTR_NAME_PRIMARY_URI, destinationFileClient.getFileUrl()); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(destinationFileClient.getProperties().getFileSize())); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, sourceFileClient.getFileUrl(), transferMillis); + } catch (DataLakeStorageException dlsException) { + if (dlsException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { + session.transfer(flowFile, REL_SUCCESS); + String warningMessage = String.format("File with the same name already exists. " + + "Remote file not modified. " + + "Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); + getLogger().warn(warningMessage, flowFile); + } else { + throw dlsException; + } + } + } catch (Exception e) { + getLogger().error("Failed to move file on Azure Data Lake Storage", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java index 087cbaa18b..99f7dfc813 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java @@ -42,4 +42,10 @@ public final class ADLSAttributes { public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri"; public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content"; + public static final String ATTR_NAME_SOURCE_FILESYSTEM = "azure.source.filesystem"; + public static final String ATTR_DESCRIPTION_SOURCE_FILESYSTEM = "The name of the source Azure File System"; + + public static final String ATTR_NAME_SOURCE_DIRECTORY = "azure.source.directory"; + public static final String ATTR_DESCRIPTION_SOURCE_DIRECTORY = "The name of the source Azure Directory"; + } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 24cadf6fee..ce7e074a0e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -30,3 +30,4 @@ org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12 +org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index 54f0932154..383ab1686f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -87,22 +87,28 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag } protected void uploadFile(String directory, String filename, String fileContent) { - byte[] fileContentBytes = fileContent.getBytes(); - - DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); - DataLakeFileClient fileClient = directoryClient.createFile(filename); - - PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length); + uploadFile(directory, filename, fileContent.getBytes()); } protected void uploadFile(TestFile testFile) { uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent()); } + protected void uploadFile(String directory, String filename, byte[] fileData) { + DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); + DataLakeFileClient fileClient = directoryClient.createFile(filename); + + PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileData), fileData.length); + } + protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) { + createDirectoryAndUploadFile(directory, filename, fileContent.getBytes()); + } + + protected void createDirectoryAndUploadFile(String directory, String filename, byte[] fileData) { createDirectory(directory); - uploadFile(directory, filename, fileContent); + uploadFile(directory, filename, fileData); } protected void createDirectoryAndUploadFile(TestFile testFile) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java new file mode 100644 index 0000000000..d049d9db5c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.google.common.collect.Sets; +import com.google.common.net.UrlEscapers; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { + + private static final String SOURCE_DIRECTORY = "sourceDir1"; + private static final String DESTINATION_DIRECTORY = "destDir1"; + private static final String FILE_NAME = "file1"; + private static final byte[] FILE_DATA = "0123456789".getBytes(); + + private static final String EL_FILESYSTEM = "az.filesystem"; + private static final String EL_DIRECTORY = "az.directory"; + private static final String EL_FILE_NAME = "az.filename"; + + @Override + protected Class getProcessorClass() { + return MoveAzureDataLakeStorage.class; + } + + @Before + public void setUp() throws InterruptedException { + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, fileSystemName); + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY); + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName); + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY); + runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME); + } + + @Test + public void testMoveFileToExistingDirectory() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + createDirectory(DESTINATION_DIRECTORY); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + createDirectory(DESTINATION_DIRECTORY); + + runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToExistingDirectoryWithIgnoreResolution() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + createDirectory(DESTINATION_DIRECTORY); + + runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToNonExistingDirectory() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToDeepDirectory() throws Exception { + String sourceDirectory = "dir1/dir2"; + String destinationDirectory = sourceDirectory + "/dir3/dir4"; + createDirectoryAndUploadFile(sourceDirectory, FILE_NAME, FILE_DATA); + + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory); + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory); + + runProcessor(FILE_DATA); + + assertSuccess(sourceDirectory, destinationDirectory, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToRootDirectory() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + + String rootDirectory = ""; + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, rootDirectory); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, rootDirectory, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveEmptyFile() throws Exception { + byte[] fileData = new byte[0]; + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData); + + runProcessor(fileData); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData); + } + + @Test + public void testMoveBigFile() throws Exception { + Random random = new Random(); + byte[] fileData = new byte[120_000_000]; + random.nextBytes(fileData); + + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, fileData); + + runProcessor(fileData); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, fileData); + } + + @Test + public void testMoveFileWithNonExistingFileSystem() { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, "dummy"); + + runProcessor(FILE_DATA); + + assertFailure(); + } + + @Test + public void testMoveFileWithInvalidFileName() { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + + runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1"); + + runProcessor(FILE_DATA); + + assertFailure(); + } + + @Test + public void testMoveFileWithSpacesInDirectoryAndFileName() throws Exception { + String sourceDirectory = "dir 1"; + String destinationDirectory = "dest dir1"; + String fileName = "file 1"; + createDirectoryAndUploadFile(sourceDirectory, fileName, FILE_DATA); + + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory); + runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory); + runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName); + + runProcessor(FILE_DATA); + + assertSuccess(sourceDirectory, destinationDirectory, fileName, FILE_DATA); + } + + @Test + public void testMoveFileToExistingFileWithFailResolution() { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME)); + + runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.FAIL_RESOLUTION); + + runProcessor(FILE_DATA); + + assertFailure(); + } + + @Test + public void testMoveFileToExistingFileWithReplaceResolution() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + fileSystemClient.createFile(String.format("%s/%s", DESTINATION_DIRECTORY, FILE_NAME)); + + runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.REPLACE_RESOLUTION); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileToExistingFileWithIgnoreResolution() throws Exception { + String fileContent = "destination"; + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + createDirectoryAndUploadFile(DESTINATION_DIRECTORY, FILE_NAME, fileContent); + + runner.setProperty(MoveAzureDataLakeStorage.CONFLICT_RESOLUTION, MoveAzureDataLakeStorage.IGNORE_RESOLUTION); + + runProcessor(FILE_DATA); + + assertSuccessWithIgnoreResolution(DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA, fileContent.getBytes()); + } + + @Test + public void testMoveFileWithEL() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + Map attributes = createAttributesMap(FILE_DATA); + setELProperties(); + + runProcessor(FILE_DATA, attributes); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + + @Test + public void testMoveFileWithELButFilesystemIsNotSpecified() { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + Map attributes = createAttributesMap(FILE_DATA); + attributes.remove(EL_FILESYSTEM); + setELProperties(); + + runProcessor(FILE_DATA, attributes); + + assertFailure(); + } + + @Test + public void testMoveFileWithELButFileNameIsNotSpecified() { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + Map attributes = createAttributesMap(FILE_DATA); + attributes.remove(EL_FILE_NAME); + setELProperties(); + + runProcessor(FILE_DATA, attributes); + + assertFailure(); + } + + private Map createAttributesMap(byte[] fileData) { + Map attributes = new HashMap<>(); + + attributes.put(EL_FILESYSTEM, fileSystemName); + attributes.put(EL_DIRECTORY, SOURCE_DIRECTORY); + attributes.put(EL_FILE_NAME, FILE_NAME); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileData.length)); + + return attributes; + } + + private void setELProperties() { + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); + runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY)); + runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME)); + } + + private void runProcessor(byte[] fileData) { + runProcessor(fileData, Collections.singletonMap(ATTR_NAME_LENGTH, String.valueOf(fileData.length))); + } + + private void runProcessor(byte[] testData, Map attributes) { + runner.assertValid(); + runner.enqueue(testData, attributes); + runner.run(); + } + + private void assertSuccess(String sourceDirectory, String destinationDirectory, String fileName, byte[] fileData) throws Exception { + assertFlowFile(fileData, fileName, sourceDirectory, destinationDirectory); + assertAzureFile(destinationDirectory, fileName, fileData); + assertProvenanceEvents(); + } + + private void assertSuccessWithIgnoreResolution(String destinationDirectory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception { + assertFlowFile(fileData); + assertAzureFile(destinationDirectory, fileName, azureFileData); + } + + private void assertFlowFile(byte[] fileData, String fileName, String sourceDirectory, String destinationDirectory) throws Exception { + MockFlowFile flowFile = assertFlowFile(fileData); + + flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_FILESYSTEM, fileSystemName); + flowFile.assertAttributeEquals(ATTR_NAME_SOURCE_DIRECTORY, sourceDirectory); + flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); + flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, destinationDirectory); + flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName); + + String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(destinationDirectory); + String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName); + String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/"); + String primaryUri = StringUtils.isNotEmpty(destinationDirectory) + ? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName) + : String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName); + flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri); + + flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length)); + } + + private MockFlowFile assertFlowFile(byte[] fileData) throws Exception { + runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(MoveAzureDataLakeStorage.REL_SUCCESS).get(0); + + flowFile.assertContentEquals(fileData); + + return flowFile; + } + + private void assertAzureFile(String destinationDirectory, String fileName, byte[] fileData) { + DataLakeFileClient fileClient; + if (StringUtils.isNotEmpty(destinationDirectory)) { + DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(destinationDirectory); + assertTrue(directoryClient.exists()); + + fileClient = directoryClient.getFileClient(fileName); + } else { + fileClient = fileSystemClient.getFileClient(fileName); + } + + assertTrue(fileClient.exists()); + assertEquals(fileData.length, fileClient.getProperties().getFileSize()); + } + + private void assertProvenanceEvents() { + Set expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND); + + Set actualEventTypes = runner.getProvenanceEvents().stream() + .map(ProvenanceEventRecord::getEventType) + .collect(Collectors.toSet()); + assertEquals(expectedEventTypes, actualEventTypes); + } + + private void assertFailure() { + runner.assertAllFlowFilesTransferred(MoveAzureDataLakeStorage.REL_FAILURE, 1); + } +}