diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 11962bc526..5ada3ddb67 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -414,6 +414,8 @@ public class StandardValidators { public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false); + public static final Validator REGULAR_EXPRESSION_WITH_EL_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, true); + public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index fd0be044a5..bb22c576a3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -33,17 +33,23 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; import reactor.core.publisher.Mono; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; + public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() @@ -57,15 +63,16 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() .name("filesystem-name").displayName("Filesystem Name") .description("Name of the Azure Storage File System. It is assumed to be already existing.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() .name("directory-name").displayName("Directory Name") - .description("Name of the Azure Storage Directory. In case of the PutAzureDatalakeStorage processor, it will be created if not already existing.") - .addValidator(Validator.VALID) + .description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " + + "In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.") + .addValidator(new DirectoryValidator()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); @@ -73,10 +80,10 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() .name("file-name").displayName("File Name") .description("The filename") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) - .defaultValue("${azure.filename}") + .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME)) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( @@ -103,6 +110,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc return PROPERTIES; } + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { final Map attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); @@ -146,8 +158,50 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc return storageClient; } - @Override - public Set getRelationships() { - return RELATIONSHIPS; + public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) { + String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(fileSystem)) { + throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName())); + } + return fileSystem; + } + + public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) { + String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + if (directory.startsWith("/")) { + throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName())); + } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) { + throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName())); + } + return directory; + } + + public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) { + String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(fileName)) { + throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName())); + } + return fileName; + } + + private static class DirectoryValidator implements Validator { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult.Builder builder = new ValidationResult.Builder() + .subject(DIRECTORY.getDisplayName()) + .input(input); + + if (context.isExpressionLanguagePresent(input)) { + builder.valid(true).explanation("Expression Language Present"); + } else if (input.startsWith("/")) { + builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName())); + } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) { + builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName())); + } else { + builder.valid(true); + } + + return builder.build(); + } } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index 314785a6a6..0e5128e226 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) +@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @CapabilityDescription("Deletes the provided file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { @@ -49,18 +48,9 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc final long startNanos = System.nanoTime(); try { - final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); - final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - - if (StringUtils.isBlank(fileSystem)) { - throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " + - FILESYSTEM.getDisplayName() + " must be specified as a non-empty string."); - } - if (StringUtils.isBlank(fileName)) { - throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " + - FILE.getDisplayName() + " must be specified as a non-empty string."); - } + final String fileSystem = evaluateFileSystemProperty(context, flowFile); + final String directory = evaluateDirectoryProperty(context, flowFile); + final String fileName = evaluateFileNameProperty(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 5af7988685..a4febcb1ba 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class}) +@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @CapabilityDescription("Fetch the provided file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { @@ -49,18 +48,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce final long startNanos = System.nanoTime(); try { - final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); - final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - - if (StringUtils.isBlank(fileSystem)) { - throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " + - FILESYSTEM.getDisplayName() + " must be specified as a non-empty string."); - } - if (StringUtils.isBlank(fileName)) { - throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " + - FILE.getDisplayName() + " must be specified as a non-empty string."); - } + final String fileSystem = evaluateFileSystemProperty(context, flowFile); + final String directory = evaluateDirectoryProperty(context, flowFile); + final String fileName = evaluateFileNameProperty(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java new file mode 100644 index 0000000000..c726e81e31 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.ListPathsOptions; +import org.apache.commons.lang3.RegExUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET; +import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE; +import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILE_PATH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LAST_MODIFIED; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) +@CapabilityDescription("Lists directory in an Azure Data Lake Storage Gen 2 filesystem") +@WritesAttributes({ + @WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM), + @WritesAttribute(attribute = ATTR_NAME_FILE_PATH, description = ATTR_DESCRIPTION_FILE_PATH), + @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY), + @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME), + @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH), + @WritesAttribute(attribute = ATTR_NAME_LAST_MODIFIED, description = ATTR_DESCRIPTION_LAST_MODIFIED), + @WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG) +}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " + + "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " + + "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + + "where the previous node left off, without duplicating the data.") +public class ListAzureDataLakeStorage extends AbstractListProcessor { + + public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder() + .name("recurse-subdirectories") + .displayName("Recurse Subdirectories") + .description("Indicates whether to list files from subdirectories of the directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("file-filter") + .displayName("File Filter") + .description("Only files whose names match the given regular expression will be listed") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder() + .name("path-filter") + .displayName("Path Filter") + .description(String.format("When '%s' is true, then only subdirectories whose paths match the given regular expression will be scanned", RECURSE_SUBDIRECTORIES.getDisplayName())) + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + ADLS_CREDENTIALS_SERVICE, + FILESYSTEM, + DIRECTORY, + RECURSE_SUBDIRECTORIES, + FILE_FILTER, + PATH_FILTER, + RECORD_WRITER, + LISTING_STRATEGY, + TRACKING_STATE_CACHE, + TRACKING_TIME_WINDOW, + INITIAL_LISTING_TARGET)); + + private static final Set LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + ADLS_CREDENTIALS_SERVICE, + FILESYSTEM, + DIRECTORY, + RECURSE_SUBDIRECTORIES, + FILE_FILTER, + PATH_FILTER, + LISTING_STRATEGY))); + + private volatile Pattern filePattern; + private volatile Pattern pathPattern; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue(); + filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null; + + String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue(); + pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null; + } + + @OnStopped + public void onStopped() { + filePattern = null; + pathPattern = null; + } + + @Override + protected void customValidate(ValidationContext context, Collection results) { + if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) { + results.add(new ValidationResult.Builder() + .subject(PATH_FILTER.getDisplayName()) + .valid(false) + .explanation(String.format("'%s' cannot be set when '%s' is false", PATH_FILTER.getDisplayName(), RECURSE_SUBDIRECTORIES.getDisplayName())) + .build()); + } + } + + @Override + protected RecordSchema getRecordSchema() { + return ADLSFileInfo.getRecordSchema(); + } + + @Override + protected Scope getStateScope(PropertyContext context) { + return Scope.CLUSTER; + } + + @Override + protected String getDefaultTimePrecision() { + return PRECISION_MILLIS.getValue(); + } + + @Override + protected boolean isListingResetNecessary(PropertyDescriptor property) { + return LISTING_RESET_PROPERTIES.contains(property); + } + + @Override + protected String getPath(ProcessContext context) { + String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); + return directory != null ? directory : "."; + } + + @Override + protected List performListing(ProcessContext context, Long minTimestamp) throws IOException { + try { + String fileSystem = evaluateFileSystemProperty(context, null); + String baseDirectory = evaluateDirectoryProperty(context, null); + boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean(); + + DataLakeServiceClient storageClient = getStorageClient(context, null); + DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); + + ListPathsOptions options = new ListPathsOptions(); + options.setPath(baseDirectory); + options.setRecursive(recurseSubdirectories); + + Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?"); + + List listing = fileSystemClient.listPaths(options, null).stream() + .filter(pathItem -> !pathItem.isDirectory()) + .map(pathItem -> new ADLSFileInfo.Builder() + .fileSystem(fileSystem) + .filePath(pathItem.getName()) + .length(pathItem.getContentLength()) + .lastModified(pathItem.getLastModified().toInstant().toEpochMilli()) + .etag(pathItem.getETag()) + .build()) + .filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches()) + .filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches()) + .collect(Collectors.toList()); + + return listing; + } catch (Exception e) { + getLogger().error("Failed to list directory on Azure Data Lake Storage", e); + throw new IOException(ExceptionUtils.getRootCause(e)); + } + } + + @Override + protected Map createAttributes(ADLSFileInfo fileInfo, ProcessContext context) { + Map attributes = new HashMap<>(); + + attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem()); + attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath()); + attributes.put(ATTR_NAME_DIRECTORY, fileInfo.getDirectory()); + attributes.put(ATTR_NAME_FILENAME, fileInfo.getFilename()); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileInfo.getLength())); + attributes.put(ATTR_NAME_LAST_MODIFIED, String.valueOf(fileInfo.getLastModified())); + attributes.put(ATTR_NAME_ETAG, fileInfo.getEtag()); + + return attributes; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 081372f4bc..ae67e3e768 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -21,7 +21,6 @@ import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.DataLakeStorageException; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -46,14 +45,25 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; + @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) +@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") -@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), - @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), - @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"), - @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"), - @WritesAttribute(attribute = "azure.length", description = "Length of the file")}) +@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM), + @WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY), + @WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME), + @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI), + @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) @InputRequirement(Requirement.INPUT_REQUIRED) public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { @@ -93,18 +103,9 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final long startNanos = System.nanoTime(); try { - final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); - final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - - if (StringUtils.isBlank(fileSystem)) { - throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " + - FILESYSTEM.getDisplayName() + " must be specified as a non-empty string."); - } - if (StringUtils.isBlank(fileName)) { - throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " + - FILE.getDisplayName() + " must be specified as a non-empty string."); - } + final String fileSystem = evaluateFileSystemProperty(context, flowFile); + final String directory = evaluateDirectoryProperty(context, flowFile); + final String fileName = evaluateFileNameProperty(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); @@ -126,11 +127,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess fileClient.flush(length); final Map attributes = new HashMap<>(); - attributes.put("azure.filesystem", fileSystem); - attributes.put("azure.directory", directory); - attributes.put("azure.filename", fileName); - attributes.put("azure.primaryUri", fileClient.getFileUrl()); - attributes.put("azure.length", String.valueOf(length)); + attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); + attributes.put(ATTR_NAME_DIRECTORY, directory); + attributes.put(ATTR_NAME_FILENAME, fileName); + attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl()); + attributes.put(ATTR_NAME_LENGTH, String.valueOf(length)); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java new file mode 100644 index 0000000000..087cbaa18b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSAttributes.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +public final class ADLSAttributes { + + public static final String ATTR_NAME_FILESYSTEM = "azure.filesystem"; + public static final String ATTR_DESCRIPTION_FILESYSTEM = "The name of the Azure File System"; + + public static final String ATTR_NAME_DIRECTORY = "azure.directory"; + public static final String ATTR_DESCRIPTION_DIRECTORY = "The name of the Azure Directory"; + + public static final String ATTR_NAME_FILENAME = "azure.filename"; + public static final String ATTR_DESCRIPTION_FILENAME = "The name of the Azure File"; + + public static final String ATTR_NAME_LENGTH = "azure.length"; + public static final String ATTR_DESCRIPTION_LENGTH = "The length of the Azure File"; + + public static final String ATTR_NAME_LAST_MODIFIED = "azure.lastModified"; + public static final String ATTR_DESCRIPTION_LAST_MODIFIED = "The last modification time of the Azure File"; + + public static final String ATTR_NAME_ETAG = "azure.etag"; + public static final String ATTR_DESCRIPTION_ETAG = "The ETag of the Azure File"; + + public static final String ATTR_NAME_FILE_PATH = "azure.filePath"; + public static final String ATTR_DESCRIPTION_FILE_PATH = "The full path of the Azure File"; + + public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri"; + public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content"; + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java new file mode 100644 index 0000000000..77123814e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ADLSFileInfo.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.util.list.ListableEntity; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class ADLSFileInfo implements Comparable, Serializable, ListableEntity { + + private static final RecordSchema SCHEMA; + private static final String FILESYSTEM = "filesystem"; + private static final String FILE_PATH = "filePath"; + private static final String DIRECTORY = "directory"; + private static final String FILENAME = "filename"; + private static final String LENGTH = "length"; + private static final String LAST_MODIFIED = "lastModified"; + private static final String ETAG = "etag"; + + static { + List recordFields = new ArrayList<>(); + recordFields.add(new RecordField(FILESYSTEM, RecordFieldType.STRING.getDataType(), false)); + recordFields.add(new RecordField(FILE_PATH, RecordFieldType.STRING.getDataType(), false)); + recordFields.add(new RecordField(DIRECTORY, RecordFieldType.STRING.getDataType(), false)); + recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false)); + recordFields.add(new RecordField(LENGTH, RecordFieldType.LONG.getDataType(), false)); + recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false)); + recordFields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType())); + SCHEMA = new SimpleRecordSchema(recordFields); + } + + private static final Comparator COMPARATOR = Comparator.comparing(ADLSFileInfo::getFileSystem).thenComparing(ADLSFileInfo::getFilePath); + + private final String fileSystem; + private final String filePath; + private final long length; + private final long lastModified; + private final String etag; + + private ADLSFileInfo(Builder builder) { + this.fileSystem = builder.fileSystem; + this.filePath = builder.filePath; + this.length = builder.length; + this.lastModified = builder.lastModified; + this.etag = builder.etag; + } + + public String getFileSystem() { + return fileSystem; + } + + public String getFilePath() { + return filePath; + } + + public long getLength() { + return length; + } + + public long getLastModified() { + return lastModified; + } + + public String getEtag() { + return etag; + } + + public String getDirectory() { + return filePath.contains("/") ? StringUtils.substringBeforeLast(filePath, "/") : ""; + } + + public String getFilename() { + return filePath.contains("/") ? StringUtils.substringAfterLast(filePath, "/") : filePath; + } + + @Override + public String getName() { + return getFilePath(); + } + + @Override + public String getIdentifier() { + return getFilePath(); + } + + @Override + public long getTimestamp() { + return getLastModified(); + } + + @Override + public long getSize() { + return getLength(); + } + + public static RecordSchema getRecordSchema() { + return SCHEMA; + } + + @Override + public Record toRecord() { + Map values = new HashMap<>(); + values.put(FILESYSTEM, getFileSystem()); + values.put(FILE_PATH, getFilePath()); + values.put(DIRECTORY, getDirectory()); + values.put(FILENAME, getFilename()); + values.put(LENGTH, getLength()); + values.put(LAST_MODIFIED, getLastModified()); + values.put(ETAG, getEtag()); + return new MapRecord(SCHEMA, values); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + ADLSFileInfo otherFileInfo = (ADLSFileInfo) other; + return Objects.equals(fileSystem, otherFileInfo.fileSystem) + && Objects.equals(filePath, otherFileInfo.filePath); + } + + @Override + public int hashCode() { + return Objects.hash(fileSystem, filePath); + } + + @Override + public int compareTo(ADLSFileInfo other) { + return COMPARATOR.compare(this, other); + } + + public static class Builder { + private String fileSystem; + private String filePath; + private long length; + private long lastModified; + private String etag; + + public Builder fileSystem(String fileSystem) { + this.fileSystem = fileSystem; + return this; + } + + public Builder filePath(String filePath) { + this.filePath = filePath; + return this; + } + + public Builder length(long length) { + this.length = length; + return this; + } + + public Builder lastModified(long lastModified) { + this.lastModified = lastModified; + return this; + } + + public Builder etag(String etag) { + this.etag = etag; + return this; + } + + public ADLSFileInfo build() { + return new ADLSFileInfo(this); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6e0933064c..015596e386 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -23,4 +23,5 @@ org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage -org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage +org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index da03eae96b..553621ba96 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService; @@ -35,6 +36,7 @@ import java.util.UUID; public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT { private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem"; + private static final String TEST_FILE_CONTENT = "test"; protected String fileSystemName; protected DataLakeFileSystemClient fileSystemClient; @@ -72,6 +74,10 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag .buildClient(); } + protected void createDirectory(String directory) { + fileSystemClient.createDirectory(directory); + } + protected void uploadFile(String directory, String filename, String fileContent) { byte[] fileContentBytes = fileContent.getBytes(); @@ -82,9 +88,49 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag fileClient.flush(fileContentBytes.length); } + protected void uploadFile(TestFile testFile) { + uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent()); + } + protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) { - fileSystemClient.createDirectory(directory); + createDirectory(directory); uploadFile(directory, filename, fileContent); } + + protected void createDirectoryAndUploadFile(TestFile testFile) { + createDirectoryAndUploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent()); + } + + protected static class TestFile { + private final String directory; + private final String filename; + private final String fileContent; + + public TestFile(String directory, String filename, String fileContent) { + this.directory = directory; + this.filename = filename; + this.fileContent = fileContent; + } + + public TestFile(String directory, String filename) { + this(directory, filename, TEST_FILE_CONTENT); + } + + public String getDirectory() { + return directory; + } + + public String getFilename() { + return filename; + } + + public String getFileContent() { + return fileContent; + } + + public String getFilePath() { + return StringUtils.isNotBlank(directory) ? String.format("%s/%s", directory, filename) : filename; + } + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java new file mode 100644 index 0000000000..fd308bc331 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { + + private Map testFiles; + + @Override + protected Class getProcessorClass() { + return ListAzureDataLakeStorage.class; + } + + @Before + public void setUp() { + testFiles = new HashMap<>(); + + TestFile testFile1 = new TestFile("", "file1"); + uploadFile(testFile1); + testFiles.put(testFile1.getFilePath(), testFile1); + + TestFile testFile2 = new TestFile("", "file2"); + uploadFile(testFile2); + testFiles.put(testFile2.getFilePath(), testFile2); + + TestFile testFile11 = new TestFile("dir1", "file11"); + createDirectoryAndUploadFile(testFile11); + testFiles.put(testFile11.getFilePath(), testFile11); + + TestFile testFile12 = new TestFile("dir1", "file12"); + uploadFile(testFile12); + testFiles.put(testFile12.getFilePath(), testFile12); + + TestFile testFile111 = new TestFile("dir1/dir11", "file111"); + createDirectoryAndUploadFile(testFile111); + testFiles.put(testFile111.getFilePath(), testFile111); + + TestFile testFile21 = new TestFile("dir 2", "file 21"); + createDirectoryAndUploadFile(testFile21); + testFiles.put(testFile21.getFilePath(), testFile21); + + createDirectory("dir3"); + } + + @Test + public void testListRootRecursive() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); + } + + @Test + public void testListRootNonRecursive() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); + + runProcessor(); + + assertSuccess("file1", "file2"); + } + + @Test + public void testListSubdirectoryRecursive() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListSubdirectoryNonRecursive() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12"); + } + + @Test + public void testListWithFileFilter() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$"); + + runProcessor(); + + assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListWithFileFilterWithEL() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$"); + runner.setVariable("suffix", "1.*"); + + runProcessor(); + + assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListRootWithPathFilter() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListRootWithPathFilterWithEL() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}"); + runner.setVariable("prefix", "^dir"); + runner.setVariable("suffix", "1.*$"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111"); + } + + @Test + public void testListSubdirectoryWithPathFilter() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); + + runProcessor(); + + assertSuccess("dir1/dir11/file111"); + } + + @Test + public void testListRootWithFileAndPathFilter() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11"); + runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); + + runProcessor(); + + assertSuccess("dir1/file11", "dir1/dir11/file111"); + } + + @Test + public void testListEmptyDirectory() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3"); + + runProcessor(); + + assertSuccess(); + } + + @Test + public void testListNonExistingDirectory() { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy"); + + runProcessor(); + + assertFailure(); + } + + @Test + public void testListWithNonExistingFileSystem() { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy"); + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + + runProcessor(); + + assertFailure(); + } + + @Test + public void testListWithRecords() throws InitializationException { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + + MockRecordWriter recordWriter = new MockRecordWriter(null, false); + runner.addControllerService("record-writer", recordWriter); + runner.enableControllerService(recordWriter); + runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("record.count", "3"); + } + + private void runProcessor() { + runner.assertValid(); + runner.run(); + } + + private void assertSuccess(String... testFilePaths) throws Exception { + runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length); + + Map expectedFiles = new HashMap<>(testFiles); + expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths)); + + List flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS); + + for (MockFlowFile flowFile : flowFiles) { + String filePath = flowFile.getAttribute("azure.filePath"); + TestFile testFile = expectedFiles.remove(filePath); + assertNotNull("File path not found in the expected map", testFile); + assertFlowFile(testFile, flowFile); + } + } + + private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception { + flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); + flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath()); + flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory()); + flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename()); + flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().length())); + + flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED); + flowFile.assertAttributeExists(ATTR_NAME_ETAG); + + flowFile.assertContentEquals(""); + } + + private void assertFailure() { + assertFalse(runner.getLogger().getErrorMessages().isEmpty()); + runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 049031e548..324e20d954 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -35,6 +35,11 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -148,15 +153,6 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithInvalidDirectory() { - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, "/dir1"); - - runProcessor(FILE_DATA); - - assertFailure(); - } - @Test public void testPutFileWithInvalidFileName() { runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1"); @@ -282,18 +278,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception { MockFlowFile flowFile = assertFlowFile(fileData); - flowFile.assertAttributeEquals("azure.filesystem", fileSystemName); - flowFile.assertAttributeEquals("azure.directory", directory); - flowFile.assertAttributeEquals("azure.filename", fileName); + flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); + flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory); + flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName); String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(directory); String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName); String primaryUri = StringUtils.isNotEmpty(directory) ? String.format("https://%s.dfs.core.windows.net/%s/%s/%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedFileName) : String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName); - flowFile.assertAttributeEquals("azure.primaryUri", primaryUri); + flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri); - flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length)); + flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length)); } private MockFlowFile assertFlowFile(byte[] fileData) throws Exception { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java index 94fc5761a8..a4a4c64788 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java @@ -47,7 +47,11 @@ public class TestAbstractAzureDataLakeStorage { runner.setProperty(DIRECTORY, "directory"); runner.setProperty(FILE, "file"); runner.setProperty(ADLS_CREDENTIALS_SERVICE, "credentials_service"); + } + @Test + public void testValid() { + runner.assertValid(); } @Test @@ -79,6 +83,27 @@ public class TestAbstractAzureDataLakeStorage { runner.assertValid(); } + @Test + public void testNotValidWhenDirectoryIsSlash() { + runner.setProperty(DIRECTORY, "/"); + + runner.assertNotValid(); + } + + @Test + public void testNotValidWhenDirectoryStartsWithSlash() { + runner.setProperty(DIRECTORY, "/directory"); + + runner.assertNotValid(); + } + + @Test + public void testNotValidWhenDirectoryIsWhitespaceOnly() { + runner.setProperty(DIRECTORY, " "); + + runner.assertNotValid(); + } + @Test public void testValidWhenNoFileSpecified() { // the default value will be used