NIFI-7340: Adding ListAzureDataLakeStorage

Also added validator for Directory Name property in AbstractAzureDataLakeStorageProcessor
Fix Tracking Entities strategy: use milliseconds for lastModified

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4438.
This commit is contained in:
Peter Turcsanyi 2020-07-22 00:28:54 +02:00 committed by Pierre Villard
parent 7b4cce9e21
commit c980b64bf5
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
13 changed files with 956 additions and 76 deletions

View File

@ -414,6 +414,8 @@ public class StandardValidators {
public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
public static final Validator REGULAR_EXPRESSION_WITH_EL_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, true);
public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {

View File

@ -33,17 +33,23 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import reactor.core.publisher.Mono;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
@ -57,15 +63,16 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name").displayName("Directory Name")
.description("Name of the Azure Storage Directory. In case of the PutAzureDatalakeStorage processor, it will be created if not already existing.")
.addValidator(Validator.VALID)
.description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
"In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
.addValidator(new DirectoryValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
@ -73,10 +80,10 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue("${azure.filename}")
.defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
@ -103,6 +110,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
@ -146,8 +158,50 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
return storageClient;
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILESYSTEM.getDisplayName()));
}
return fileSystem;
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
if (directory.startsWith("/")) {
throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", DIRECTORY.getDisplayName()));
} else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", DIRECTORY.getDisplayName()));
}
return directory;
}
public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) {
String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName()));
}
return fileName;
}
private static class DirectoryValidator implements Validator {
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult.Builder builder = new ValidationResult.Builder()
.subject(DIRECTORY.getDisplayName())
.input(input);
if (context.isExpressionLanguagePresent(input)) {
builder.valid(true).explanation("Expression Language Present");
} else if (input.startsWith("/")) {
builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", DIRECTORY.getDisplayName()));
} else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", DIRECTORY.getDisplayName()));
} else {
builder.valid(true);
}
return builder.build();
}
}
}

View File

@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@ -49,18 +48,9 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final long startNanos = System.nanoTime();
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
}
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
FILE.getDisplayName() + " must be specified as a non-empty string.");
}
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);

View File

@ -20,7 +20,6 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -35,7 +34,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@ -49,18 +48,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final long startNanos = System.nanoTime();
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
}
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
FILE.getDisplayName() + " must be specified as a non-empty string.");
}
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.ListPathsOptions;
import org.apache.commons.lang3.RegExUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
@PrimaryNodeOnly
@TriggerSerially
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@CapabilityDescription("Lists directory in an Azure Data Lake Storage Gen 2 filesystem")
@WritesAttributes({
@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
@WritesAttribute(attribute = ATTR_NAME_FILE_PATH, description = ATTR_DESCRIPTION_FILE_PATH),
@WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
@WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
@WritesAttribute(attribute = ATTR_NAME_LAST_MODIFIED, description = ATTR_DESCRIPTION_LAST_MODIFIED),
@WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG)
})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " +
"This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. State is " +
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
"where the previous node left off, without duplicating the data.")
public class ListAzureDataLakeStorage extends AbstractListProcessor<ADLSFileInfo> {
public static final PropertyDescriptor RECURSE_SUBDIRECTORIES = new PropertyDescriptor.Builder()
.name("recurse-subdirectories")
.displayName("Recurse Subdirectories")
.description("Indicates whether to list files from subdirectories of the directory")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("file-filter")
.displayName("File Filter")
.description("Only files whose names match the given regular expression will be listed")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
.name("path-filter")
.displayName("Path Filter")
.description(String.format("When '%s' is true, then only subdirectories whose paths match the given regular expression will be scanned", RECURSE_SUBDIRECTORIES.getDisplayName()))
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_WITH_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
RECURSE_SUBDIRECTORIES,
FILE_FILTER,
PATH_FILTER,
RECORD_WRITER,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
TRACKING_TIME_WINDOW,
INITIAL_LISTING_TARGET));
private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
RECURSE_SUBDIRECTORIES,
FILE_FILTER,
PATH_FILTER,
LISTING_STRATEGY)));
private volatile Pattern filePattern;
private volatile Pattern pathPattern;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue();
filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null;
String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue();
pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null;
}
@OnStopped
public void onStopped() {
filePattern = null;
pathPattern = null;
}
@Override
protected void customValidate(ValidationContext context, Collection<ValidationResult> results) {
if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
results.add(new ValidationResult.Builder()
.subject(PATH_FILTER.getDisplayName())
.valid(false)
.explanation(String.format("'%s' cannot be set when '%s' is false", PATH_FILTER.getDisplayName(), RECURSE_SUBDIRECTORIES.getDisplayName()))
.build());
}
}
@Override
protected RecordSchema getRecordSchema() {
return ADLSFileInfo.getRecordSchema();
}
@Override
protected Scope getStateScope(PropertyContext context) {
return Scope.CLUSTER;
}
@Override
protected String getDefaultTimePrecision() {
return PRECISION_MILLIS.getValue();
}
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
return LISTING_RESET_PROPERTIES.contains(property);
}
@Override
protected String getPath(ProcessContext context) {
String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
return directory != null ? directory : ".";
}
@Override
protected List<ADLSFileInfo> performListing(ProcessContext context, Long minTimestamp) throws IOException {
try {
String fileSystem = evaluateFileSystemProperty(context, null);
String baseDirectory = evaluateDirectoryProperty(context, null);
boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
DataLakeServiceClient storageClient = getStorageClient(context, null);
DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
ListPathsOptions options = new ListPathsOptions();
options.setPath(baseDirectory);
options.setRecursive(recurseSubdirectories);
Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
.filter(pathItem -> !pathItem.isDirectory())
.map(pathItem -> new ADLSFileInfo.Builder()
.fileSystem(fileSystem)
.filePath(pathItem.getName())
.length(pathItem.getContentLength())
.lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
.etag(pathItem.getETag())
.build())
.filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
.filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
.collect(Collectors.toList());
return listing;
} catch (Exception e) {
getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
throw new IOException(ExceptionUtils.getRootCause(e));
}
}
@Override
protected Map<String, String> createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
attributes.put(ATTR_NAME_DIRECTORY, fileInfo.getDirectory());
attributes.put(ATTR_NAME_FILENAME, fileInfo.getFilename());
attributes.put(ATTR_NAME_LENGTH, String.valueOf(fileInfo.getLength()));
attributes.put(ATTR_NAME_LAST_MODIFIED, String.valueOf(fileInfo.getLastModified()));
attributes.put(ATTR_NAME_ETAG, fileInfo.getEtag());
return attributes;
}
}

View File

@ -21,7 +21,6 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -46,14 +45,25 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
@WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
@WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
@WritesAttribute(attribute = "azure.length", description = "Length of the file")})
@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_FILESYSTEM, description = ATTR_DESCRIPTION_FILESYSTEM),
@WritesAttribute(attribute = ATTR_NAME_DIRECTORY, description = ATTR_DESCRIPTION_DIRECTORY),
@WritesAttribute(attribute = ATTR_NAME_FILENAME, description = ATTR_DESCRIPTION_FILENAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
@InputRequirement(Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@ -93,18 +103,9 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
}
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
FILE.getDisplayName() + " must be specified as a non-empty string.");
}
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
@ -126,11 +127,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
fileClient.flush(length);
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.filesystem", fileSystem);
attributes.put("azure.directory", directory);
attributes.put("azure.filename", fileName);
attributes.put("azure.primaryUri", fileClient.getFileUrl());
attributes.put("azure.length", String.valueOf(length));
attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
attributes.put(ATTR_NAME_DIRECTORY, directory);
attributes.put(ATTR_NAME_FILENAME, fileName);
attributes.put(ATTR_NAME_PRIMARY_URI, fileClient.getFileUrl());
attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
public final class ADLSAttributes {
public static final String ATTR_NAME_FILESYSTEM = "azure.filesystem";
public static final String ATTR_DESCRIPTION_FILESYSTEM = "The name of the Azure File System";
public static final String ATTR_NAME_DIRECTORY = "azure.directory";
public static final String ATTR_DESCRIPTION_DIRECTORY = "The name of the Azure Directory";
public static final String ATTR_NAME_FILENAME = "azure.filename";
public static final String ATTR_DESCRIPTION_FILENAME = "The name of the Azure File";
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "The length of the Azure File";
public static final String ATTR_NAME_LAST_MODIFIED = "azure.lastModified";
public static final String ATTR_DESCRIPTION_LAST_MODIFIED = "The last modification time of the Azure File";
public static final String ATTR_NAME_ETAG = "azure.etag";
public static final String ATTR_DESCRIPTION_ETAG = "The ETag of the Azure File";
public static final String ATTR_NAME_FILE_PATH = "azure.filePath";
public static final String ATTR_DESCRIPTION_FILE_PATH = "The full path of the Azure File";
public static final String ATTR_NAME_PRIMARY_URI = "azure.primaryUri";
public static final String ATTR_DESCRIPTION_PRIMARY_URI = "Primary location for file content";
}

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ADLSFileInfo implements Comparable<ADLSFileInfo>, Serializable, ListableEntity {
private static final RecordSchema SCHEMA;
private static final String FILESYSTEM = "filesystem";
private static final String FILE_PATH = "filePath";
private static final String DIRECTORY = "directory";
private static final String FILENAME = "filename";
private static final String LENGTH = "length";
private static final String LAST_MODIFIED = "lastModified";
private static final String ETAG = "etag";
static {
List<RecordField> recordFields = new ArrayList<>();
recordFields.add(new RecordField(FILESYSTEM, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(FILE_PATH, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(DIRECTORY, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(LENGTH, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
recordFields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
SCHEMA = new SimpleRecordSchema(recordFields);
}
private static final Comparator<ADLSFileInfo> COMPARATOR = Comparator.comparing(ADLSFileInfo::getFileSystem).thenComparing(ADLSFileInfo::getFilePath);
private final String fileSystem;
private final String filePath;
private final long length;
private final long lastModified;
private final String etag;
private ADLSFileInfo(Builder builder) {
this.fileSystem = builder.fileSystem;
this.filePath = builder.filePath;
this.length = builder.length;
this.lastModified = builder.lastModified;
this.etag = builder.etag;
}
public String getFileSystem() {
return fileSystem;
}
public String getFilePath() {
return filePath;
}
public long getLength() {
return length;
}
public long getLastModified() {
return lastModified;
}
public String getEtag() {
return etag;
}
public String getDirectory() {
return filePath.contains("/") ? StringUtils.substringBeforeLast(filePath, "/") : "";
}
public String getFilename() {
return filePath.contains("/") ? StringUtils.substringAfterLast(filePath, "/") : filePath;
}
@Override
public String getName() {
return getFilePath();
}
@Override
public String getIdentifier() {
return getFilePath();
}
@Override
public long getTimestamp() {
return getLastModified();
}
@Override
public long getSize() {
return getLength();
}
public static RecordSchema getRecordSchema() {
return SCHEMA;
}
@Override
public Record toRecord() {
Map<String, Object> values = new HashMap<>();
values.put(FILESYSTEM, getFileSystem());
values.put(FILE_PATH, getFilePath());
values.put(DIRECTORY, getDirectory());
values.put(FILENAME, getFilename());
values.put(LENGTH, getLength());
values.put(LAST_MODIFIED, getLastModified());
values.put(ETAG, getEtag());
return new MapRecord(SCHEMA, values);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ADLSFileInfo otherFileInfo = (ADLSFileInfo) other;
return Objects.equals(fileSystem, otherFileInfo.fileSystem)
&& Objects.equals(filePath, otherFileInfo.filePath);
}
@Override
public int hashCode() {
return Objects.hash(fileSystem, filePath);
}
@Override
public int compareTo(ADLSFileInfo other) {
return COMPARATOR.compare(this, other);
}
public static class Builder {
private String fileSystem;
private String filePath;
private long length;
private long lastModified;
private String etag;
public Builder fileSystem(String fileSystem) {
this.fileSystem = fileSystem;
return this;
}
public Builder filePath(String filePath) {
this.filePath = filePath;
return this;
}
public Builder length(long length) {
this.length = length;
return this;
}
public Builder lastModified(long lastModified) {
this.lastModified = lastModified;
return this;
}
public Builder etag(String etag) {
this.etag = etag;
return this;
}
public ADLSFileInfo build() {
return new ADLSFileInfo(this);
}
}
}

View File

@ -24,3 +24,4 @@ org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage

View File

@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
@ -35,6 +36,7 @@ import java.util.UUID;
public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
private static final String FILESYSTEM_NAME_PREFIX = "nifi-test-filesystem";
private static final String TEST_FILE_CONTENT = "test";
protected String fileSystemName;
protected DataLakeFileSystemClient fileSystemClient;
@ -72,6 +74,10 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
.buildClient();
}
protected void createDirectory(String directory) {
fileSystemClient.createDirectory(directory);
}
protected void uploadFile(String directory, String filename, String fileContent) {
byte[] fileContentBytes = fileContent.getBytes();
@ -82,9 +88,49 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
fileClient.flush(fileContentBytes.length);
}
protected void uploadFile(TestFile testFile) {
uploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
}
protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
fileSystemClient.createDirectory(directory);
createDirectory(directory);
uploadFile(directory, filename, fileContent);
}
protected void createDirectoryAndUploadFile(TestFile testFile) {
createDirectoryAndUploadFile(testFile.getDirectory(), testFile.getFilename(), testFile.getFileContent());
}
protected static class TestFile {
private final String directory;
private final String filename;
private final String fileContent;
public TestFile(String directory, String filename, String fileContent) {
this.directory = directory;
this.filename = filename;
this.fileContent = fileContent;
}
public TestFile(String directory, String filename) {
this(directory, filename, TEST_FILE_CONTENT);
}
public String getDirectory() {
return directory;
}
public String getFilename() {
return filename;
}
public String getFileContent() {
return fileContent;
}
public String getFilePath() {
return StringUtils.isNotBlank(directory) ? String.format("%s/%s", directory, filename) : filename;
}
}
}

View File

@ -0,0 +1,267 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private Map<String, TestFile> testFiles;
@Override
protected Class<? extends Processor> getProcessorClass() {
return ListAzureDataLakeStorage.class;
}
@Before
public void setUp() {
testFiles = new HashMap<>();
TestFile testFile1 = new TestFile("", "file1");
uploadFile(testFile1);
testFiles.put(testFile1.getFilePath(), testFile1);
TestFile testFile2 = new TestFile("", "file2");
uploadFile(testFile2);
testFiles.put(testFile2.getFilePath(), testFile2);
TestFile testFile11 = new TestFile("dir1", "file11");
createDirectoryAndUploadFile(testFile11);
testFiles.put(testFile11.getFilePath(), testFile11);
TestFile testFile12 = new TestFile("dir1", "file12");
uploadFile(testFile12);
testFiles.put(testFile12.getFilePath(), testFile12);
TestFile testFile111 = new TestFile("dir1/dir11", "file111");
createDirectoryAndUploadFile(testFile111);
testFiles.put(testFile111.getFilePath(), testFile111);
TestFile testFile21 = new TestFile("dir 2", "file 21");
createDirectoryAndUploadFile(testFile21);
testFiles.put(testFile21.getFilePath(), testFile21);
createDirectory("dir3");
}
@Test
public void testListRootRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
assertSuccess("file1", "file2");
}
@Test
public void testListSubdirectoryRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListSubdirectoryNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12");
}
@Test
public void testListWithFileFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file1.*$");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithFileFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, "^file${suffix}$");
runner.setVariable("suffix", "1.*");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilterWithEL() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setVariable("prefix", "^dir");
runner.setVariable("suffix", "1.*$");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListSubdirectoryWithPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
assertSuccess("dir1/dir11/file111");
}
@Test
public void testListRootWithFileAndPathFilter() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
assertSuccess("dir1/file11", "dir1/dir11/file111");
}
@Test
public void testListEmptyDirectory() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
runProcessor();
assertSuccess();
}
@Test
public void testListNonExistingDirectory() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy");
runProcessor();
assertFailure();
}
@Test
public void testListWithNonExistingFileSystem() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy");
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runProcessor();
assertFailure();
}
@Test
public void testListWithRecords() throws InitializationException {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
runner.run();
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("record.count", "3");
}
private void runProcessor() {
runner.assertValid();
runner.run();
}
private void assertSuccess(String... testFilePaths) throws Exception {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length);
Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFiles) {
String filePath = flowFile.getAttribute("azure.filePath");
TestFile testFile = expectedFiles.remove(filePath);
assertNotNull("File path not found in the expected map", testFile);
assertFlowFile(testFile, flowFile);
}
}
private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception {
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename());
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().length()));
flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED);
flowFile.assertAttributeExists(ATTR_NAME_ETAG);
flowFile.assertContentEquals("");
}
private void assertFailure() {
assertFalse(runner.getLogger().getErrorMessages().isEmpty());
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
}

View File

@ -35,6 +35,11 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -148,15 +153,6 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertFailure();
}
@Test
public void testPutFileWithInvalidDirectory() {
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, "/dir1");
runProcessor(FILE_DATA);
assertFailure();
}
@Test
public void testPutFileWithInvalidFileName() {
runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
@ -282,18 +278,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception {
MockFlowFile flowFile = assertFlowFile(fileData);
flowFile.assertAttributeEquals("azure.filesystem", fileSystemName);
flowFile.assertAttributeEquals("azure.directory", directory);
flowFile.assertAttributeEquals("azure.filename", fileName);
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory);
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(directory);
String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
String primaryUri = StringUtils.isNotEmpty(directory)
? String.format("https://%s.dfs.core.windows.net/%s/%s/%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedFileName)
: String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
flowFile.assertAttributeEquals("azure.primaryUri", primaryUri);
flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
}
private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {

View File

@ -47,7 +47,11 @@ public class TestAbstractAzureDataLakeStorage {
runner.setProperty(DIRECTORY, "directory");
runner.setProperty(FILE, "file");
runner.setProperty(ADLS_CREDENTIALS_SERVICE, "credentials_service");
}
@Test
public void testValid() {
runner.assertValid();
}
@Test
@ -79,6 +83,27 @@ public class TestAbstractAzureDataLakeStorage {
runner.assertValid();
}
@Test
public void testNotValidWhenDirectoryIsSlash() {
runner.setProperty(DIRECTORY, "/");
runner.assertNotValid();
}
@Test
public void testNotValidWhenDirectoryStartsWithSlash() {
runner.setProperty(DIRECTORY, "/directory");
runner.assertNotValid();
}
@Test
public void testNotValidWhenDirectoryIsWhitespaceOnly() {
runner.setProperty(DIRECTORY, " ");
runner.assertNotValid();
}
@Test
public void testValidWhenNoFileSpecified() {
// the default value will be used