NIFI-12672 Added AzureFileResourceService

This closes #8359.

Signed-off-by: Mark Bathori <mbathori@apache.org>
This commit is contained in:
Balázs Gerner 2024-02-01 15:51:00 +01:00 committed by Mark Bathori
parent 0884b627b2
commit 03bba7049a
No known key found for this signature in database
GPG Key ID: 32DC6BA5A13FAA04
26 changed files with 1008 additions and 224 deletions

View File

@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
@ -52,14 +53,6 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("storage-credentials-service")
.displayName("Storage Credentials")
.description("Controller Service used to obtain Azure Blob Storage Credentials.")
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
.required(true)
.build();
public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder()
.name("blob-name")
.displayName("Blob Name")
@ -98,7 +91,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
}
protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
return getStorageClient(context, STORAGE_CREDENTIALS_SERVICE, flowFile);
return getStorageClient(context, BLOB_STORAGE_CREDENTIALS_SERVICE, flowFile);
}
protected BlobServiceClient getStorageClient(PropertyContext context, PropertyDescriptor storageCredentialsServiceProperty, FlowFile flowFile) {

View File

@ -17,69 +17,25 @@
package org.apache.nifi.processors.azure;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("adls-credentials-service")
.displayName("ADLS Credentials")
.description("Controller Service used to obtain Azure Credentials.")
.identifiesControllerService(ADLSCredentialsService.class)
.required(true)
.build();
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name")
.displayName("Directory Name")
.description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
"In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
.addValidator(new DirectoryValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to Azure storage are transferred to this relationship")
@ -111,77 +67,12 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
}
public DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Map.of();
final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE)
.asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes);
return clientFactory.getStorageClient(credentialsDetails);
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) {
return evaluateFileSystemProperty(context, flowFile, FILESYSTEM);
}
public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName()));
}
return fileSystem;
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) {
return evaluateDirectoryProperty(context, flowFile, DIRECTORY);
}
public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) {
String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
if (directory.startsWith("/")) {
throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName()));
} else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName()));
}
return directory;
}
public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) {
String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName()));
}
return fileName;
}
public static class DirectoryValidator implements Validator {
private String displayName;
public DirectoryValidator() {
this.displayName = null;
}
public DirectoryValidator(String displayName) {
this.displayName = displayName;
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName;
ValidationResult.Builder builder = new ValidationResult.Builder()
.subject(displayName)
.input(input);
if (context.isExpressionLanguagePresent(input)) {
builder.valid(true).explanation("Expression Language Present");
} else if (input.startsWith("/")) {
builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName));
} else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName));
} else {
builder.valid(true);
}
return builder.build();
}
}
}

View File

@ -79,6 +79,7 @@ import static com.azure.storage.blob.specialized.BlockBlobClient.MAX_STAGE_BLOCK
import static com.azure.storage.blob.specialized.BlockBlobClient.MAX_UPLOAD_BLOB_BYTES_LONG;
import static com.azure.storage.common.implementation.Constants.STORAGE_SCOPE;
import static java.net.HttpURLConnection.HTTP_ACCEPTED;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@ -148,7 +149,7 @@ public class CopyAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
.build();
public static final PropertyDescriptor DESTINATION_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(STORAGE_CREDENTIALS_SERVICE)
.fromPropertyDescriptor(BLOB_STORAGE_CREDENTIALS_SERVICE)
.displayName("Destination Storage Credentials")
.build();

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
@ -73,7 +74,7 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
STORAGE_CREDENTIALS_SERVICE,
BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME,
DELETE_SNAPSHOTS_OPTION,

View File

@ -29,19 +29,22 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.time.Duration;
import java.util.List;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@ -62,12 +65,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
.build();
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.fromPropertyDescriptor(AzureStorageUtils.FILE)
.dependsOn(FILESYSTEM_OBJECT_TYPE, FS_TYPE_FILE)
.build();
@ -90,14 +88,14 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final boolean isFile = context.getProperty(FILESYSTEM_OBJECT_TYPE).getValue().equals(FS_TYPE_FILE.getValue());
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
if (isFile) {
final String fileName = evaluateFileNameProperty(context, flowFile);
final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
fileClient.delete();
session.transfer(flowFile, REL_SUCCESS);

View File

@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@ -140,7 +141,7 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 im
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
STORAGE_CREDENTIALS_SERVICE,
BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME,
RANGE_START,

View File

@ -47,6 +47,14 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage")
@ -149,9 +157,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final DownloadRetryOptions retryOptions = new DownloadRetryOptions();
retryOptions.setMaxRetryRequests(numRetries);
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String directory = evaluateDirectoryProperty(context, flowFile);
final String fileName = evaluateFileNameProperty(context, flowFile);
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile);
final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile);
final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);

View File

@ -59,7 +59,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
@ -135,7 +135,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
STORAGE_CREDENTIALS_SERVICE,
BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME_PREFIX,
RECORD_WRITER,
@ -202,7 +202,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return STORAGE_CREDENTIALS_SERVICE.equals(property)
return BLOB_STORAGE_CREDENTIALS_SERVICE.equals(property)
|| CONTAINER.equals(property)
|| BLOB_NAME_PREFIX.equals(property)
|| LISTING_STRATEGY.equals(property);
@ -217,7 +217,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
try {
final List<BlobInfo> listing = new ArrayList<>();
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(Collections.emptyMap());
final BlobServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails);

View File

@ -62,12 +62,7 @@ import java.util.regex.Pattern;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
@ -82,6 +77,11 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@PrimaryNodeOnly
@TriggerSerially
@ -265,8 +265,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
final boolean applyFilters) throws IOException {
try {
final String fileSystem = evaluateFileSystemProperty(context, null);
final String baseDirectory = evaluateDirectoryProperty(context, null);
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context);
final String baseDirectory = evaluateDirectoryProperty(DIRECTORY, context);
final boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER);

View File

@ -37,6 +37,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator;
import java.util.HashMap;
import java.util.List;
@ -57,6 +58,13 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@ -147,11 +155,11 @@ public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProces
final long startNanos = System.nanoTime();
try {
final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM);
final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY);
final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM);
final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY);
final String fileName = evaluateFileNameProperty(context, flowFile);
final String sourceFileSystem = evaluateFileSystemProperty(SOURCE_FILESYSTEM, context, flowFile);
final String sourceDirectory = evaluateDirectoryProperty(SOURCE_DIRECTORY, context, flowFile);
final String destinationFileSystem = evaluateFileSystemProperty(DESTINATION_FILESYSTEM, context, flowFile);
final String destinationDirectory = evaluateDirectoryProperty(DESTINATION_DIRECTORY, context, flowFile);
final String fileName = evaluateFileProperty(context, flowFile);
final String destinationPath;
if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) {

View File

@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@ -104,7 +105,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
private static final List<PropertyDescriptor> PROPERTIES = List.of(
STORAGE_CREDENTIALS_SERVICE,
BLOB_STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
AzureStorageUtils.CREATE_CONTAINER,
AzureStorageUtils.CONFLICT_RESOLUTION,

View File

@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.StringUtils;
@ -61,6 +62,13 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@ -128,11 +136,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long startNanos = System.nanoTime();
try {
final String fileSystem = evaluateFileSystemProperty(context, flowFile);
final String originalDirectory = evaluateDirectoryProperty(context, flowFile);
final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH);
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile);
final String originalDirectory = evaluateDirectoryProperty(DIRECTORY, context, flowFile);
final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile);
final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY);
final String fileName = evaluateFileNameProperty(context, flowFile);
final String fileName = evaluateFileProperty(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory);

View File

@ -17,16 +17,22 @@
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.http.ProxyOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.proxy.SocksVersion;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
import reactor.netty.http.client.HttpClient;
@ -34,6 +40,9 @@ import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
public final class AzureStorageUtils {
public static final String STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME = "storage-account-name";
@ -41,6 +50,22 @@ public final class AzureStorageUtils {
public static final String STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME = "storage-sas-token";
public static final String STORAGE_ENDPOINT_SUFFIX_PROPERTY_DESCRIPTOR_NAME = "storage-endpoint-suffix";
public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("adls-credentials-service")
.displayName("ADLS Credentials")
.description("Controller Service used to obtain Azure Credentials.")
.identifiesControllerService(ADLSCredentialsService.class)
.required(true)
.build();
public static final PropertyDescriptor BLOB_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("storage-credentials-service")
.displayName("Storage Credentials")
.description("Controller Service used to obtain Azure Blob Storage Credentials.")
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
.required(true)
.build();
public static final PropertyDescriptor CREDENTIALS_TYPE = new PropertyDescriptor.Builder()
.name("credentials-type")
.displayName("Credentials Type")
@ -54,6 +79,33 @@ public final class AzureStorageUtils {
.defaultValue(AzureStorageCredentialsType.SAS_TOKEN)
.build();
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name")
.displayName("Directory Name")
.description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " +
"In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.")
.addValidator(new DirectoryValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue(String.format("${%s}", ATTR_NAME_FILENAME))
.build();
public static final String ACCOUNT_KEY_BASE_DESCRIPTION =
"The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token, Managed Identity or Service Principal instead for fine-grained control with policies.";
@ -215,6 +267,57 @@ public final class AzureStorageUtils {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context) {
return evaluateFileSystemProperty(property, context, (Map<String, String>) null);
}
public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context, FlowFile flowFile) {
return evaluateFileSystemProperty(property, context, flowFile.getAttributes());
}
public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context, Map<String, String> attributes) {
final String fileSystem = evaluateProperty(property, context, attributes);
if (StringUtils.isBlank(fileSystem)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.",
property.getDisplayName()));
}
return fileSystem;
}
public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context) {
return evaluateDirectoryProperty(property, context, (Map<String, String>) null);
}
public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context, FlowFile flowFile) {
return evaluateDirectoryProperty(property, context, flowFile.getAttributes());
}
public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context, Map<String, String> attributes) {
final String directory = evaluateProperty(property, context, attributes);
if (directory.startsWith("/")) {
throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName()));
} else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) {
throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName()));
}
return directory;
}
public static String evaluateFileProperty(PropertyContext context, FlowFile flowFile) {
return evaluateFileProperty(context, flowFile.getAttributes());
}
public static String evaluateFileProperty(PropertyContext context, Map<String, String> attributes) {
final String fileName = evaluateProperty(FILE, context, attributes);
if (StringUtils.isBlank(fileName)) {
throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName()));
}
return fileName;
}
private static String evaluateProperty(PropertyDescriptor propertyDescriptor, PropertyContext context, Map<String, String> attributes) {
return context.getProperty(propertyDescriptor).evaluateAttributeExpressions(attributes).getValue();
}
/**
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use.
@ -252,4 +355,36 @@ public final class AzureStorageUtils {
throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType());
}
}
public static class DirectoryValidator implements Validator {
private String displayName;
public DirectoryValidator() {
this.displayName = null;
}
public DirectoryValidator(String displayName) {
this.displayName = displayName;
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName;
ValidationResult.Builder builder = new ValidationResult.Builder()
.subject(displayName)
.input(input);
if (context.isExpressionLanguagePresent(input)) {
builder.valid(true).explanation("Expression Language Present");
} else if (input.startsWith("/")) {
builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName));
} else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) {
builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName));
} else {
builder.valid(true);
}
return builder.build();
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobStorageException;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
import static org.apache.nifi.util.StringUtils.isBlank;
@Tags({"azure", "microsoft", "cloud", "storage", "file", "resource", "blob"})
@SeeAlso({FetchAzureBlobStorage_v12.class})
@CapabilityDescription("Provides an Azure Blob Storage file resource for other components.")
@UseCase(
description = "Fetch a specific file from Azure Blob Storage." +
" The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.",
configuration = """
"Container Name" = "${azure.container}"
"Blob Name" = "${azure.blobname}"
The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container.
"""
)
public class AzureBlobStorageFileResourceService extends AbstractControllerService implements FileResourceService {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.CONTAINER)
.defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER))
.build();
public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME)
.defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME))
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
BLOB_STORAGE_CREDENTIALS_SERVICE,
CONTAINER,
BLOB_NAME
);
private volatile BlobServiceClientFactory clientFactory;
private volatile ConfigurationContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.clientFactory = new BlobServiceClientFactory(getLogger(), getProxyOptions(context));
this.context = context;
}
@OnDisabled
public void onDisabled() {
this.clientFactory = null;
this.context = null;
}
@Override
public FileResource getFileResource(Map<String, String> attributes) {
final BlobServiceClient client = getStorageClient(attributes);
try {
return fetchBlob(client, attributes);
} catch (final BlobStorageException | IOException e) {
throw new ProcessException("Failed to fetch blob from Azure Blob Storage", e);
}
}
protected BlobServiceClient getStorageClient(Map<String, String> attributes) {
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE)
.asControllerService(AzureStorageCredentialsService_v12.class);
return clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes));
}
/**
* Fetching blob from the provided container.
*
* @param storageClient azure blob storage client
* @param attributes configuration attributes
* @return fetched blob as FileResource
* @throws IOException exception caused by missing parameters or blob not found
*/
private FileResource fetchBlob(final BlobServiceClient storageClient, final Map<String, String> attributes) throws IOException {
final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions(attributes).getValue();
final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(attributes).getValue();
if (isBlank(containerName) || isBlank(blobName)) {
throw new ProcessException("Container name and blob name cannot be empty");
}
final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
final BlobClient blobClient = containerClient.getBlobClient(blobName);
if (!blobClient.exists()) {
throw new ProcessException(String.format("Blob %s/%s not found", containerName, blobName));
}
return new FileResource(blobClient.openInputStream(), blobClient.getProperties().getBlobSize());
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "file", "resource", "datalake"})
@SeeAlso({FetchAzureDataLakeStorage.class})
@CapabilityDescription("Provides an Azure Data Lake Storage (ADLS) file resource for other components.")
@UseCase(
description = "Fetch the specified file from Azure Data Lake Storage." +
" The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.",
configuration = """
"Filesystem Name" = "${azure.filesystem}"
"Directory Name" = "${azure.directory}"
"File Name" = "${azure.filename}"
The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.
"""
)
public class AzureDataLakeStorageFileResourceService extends AbstractControllerService implements FileResourceService {
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.FILESYSTEM)
.defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM))
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.DIRECTORY)
.defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY))
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE
);
private volatile DataLakeServiceClientFactory clientFactory;
private volatile ConfigurationContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.clientFactory = new DataLakeServiceClientFactory(getLogger(), getProxyOptions(context));
this.context = context;
}
@OnDisabled
public void onDisabled() {
this.clientFactory = null;
this.context = null;
}
@Override
public FileResource getFileResource(Map<String, String> attributes) {
final DataLakeServiceClient client = getStorageClient(attributes);
try {
return fetchFile(client, attributes);
} catch (final DataLakeStorageException | IOException e) {
throw new ProcessException("Failed to fetch file from ADLS Storage", e);
}
}
protected DataLakeServiceClient getStorageClient(Map<String, String> attributes) {
final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE)
.asControllerService(ADLSCredentialsService.class);
return clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes));
}
/**
* Fetching file from the provided filesystem and directory in ADLS.
*
* @param storageClient azure data lake service client
* @param attributes configuration attributes
* @return fetched file as FileResource
* @throws IOException exception caused by missing parameters or blob not found
*/
private FileResource fetchFile(final DataLakeServiceClient storageClient, final Map<String, String> attributes) throws IOException {
final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, attributes);
final String directory = evaluateDirectoryProperty(DIRECTORY, context, attributes);
final String file = evaluateFileProperty(context, attributes);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(file);
if (fileClient.getProperties().isDirectory()) {
throw new ProcessException(FILE.getDisplayName() + " (" + file + ") points to a directory. Full path: " + fileClient.getFilePath());
}
if (!fileClient.exists()) {
throw new ProcessException(String.format("File %s/%s not found in file system: %s", directory, file, fileSystem));
}
return new FileResource(fileClient.openInputStream().getInputStream(),
fileClient.getProperties().getFileSize());
}
}

View File

@ -21,3 +21,5 @@ org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService
org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService

View File

@ -51,6 +51,7 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorageIT {
protected static final String SERVICE_ID = "credentials-service";
@ -90,7 +91,7 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.enableControllerService(service);
runner.setProperty(AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE, SERVICE_ID);
runner.setProperty(BLOB_STORAGE_CREDENTIALS_SERVICE, SERVICE_ID);
}
@BeforeEach

View File

@ -23,7 +23,6 @@ import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
@ -58,14 +57,14 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.enableControllerService(service);
runner.setProperty(AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE, "ADLSCredentials");
runner.setProperty(AzureStorageUtils.ADLS_CREDENTIALS_SERVICE, "ADLSCredentials");
}
@BeforeEach
public void setUpAzureDataLakeStorageIT() {
fileSystemName = String.format("%s-%s", FILESYSTEM_NAME_PREFIX, UUID.randomUUID());
runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, fileSystemName);
runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystemName);
DataLakeServiceClient storageClient = createStorageClient();
fileSystemClient = storageClient.createFileSystem(fileSystemName);

View File

@ -21,6 +21,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -460,8 +461,8 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
private void setRunnerProperties(String fileSystem, String directory, String filename) {
runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM_OBJECT_TYPE, filename != null ? FS_TYPE_FILE : FS_TYPE_DIRECTORY);
runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem);
runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory);
runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem);
runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
if (filename != null) {
runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename);
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@ -518,9 +519,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
private void setRunnerProperties(String fileSystem, String directory, String filename, String rangeStart, String rangeLength) {
runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem);
runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory);
runner.setProperty(FetchAzureDataLakeStorage.FILE, filename);
runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem);
runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
runner.setProperty(AzureStorageUtils.FILE, filename);
if (rangeStart != null) {
runner.setProperty(FetchAzureDataLakeStorage.RANGE_START, rangeStart);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
@ -99,7 +100,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
@ -108,7 +109,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
@ -122,7 +123,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
configureProxyService();
runProcessor();
@ -132,7 +133,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
@ -142,7 +143,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -153,7 +154,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runProcessor();
@ -162,7 +163,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
@ -174,7 +175,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
@ -184,7 +185,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -195,7 +196,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runProcessor();
@ -205,7 +206,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -219,7 +220,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
@ -230,7 +231,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithFileFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -245,7 +246,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runProcessor();
@ -255,7 +256,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -268,7 +269,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
@ -280,7 +281,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithPathFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
@ -295,7 +296,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
@ -305,7 +306,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListSubdirectoryWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -316,7 +317,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithFileAndPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@ -327,7 +328,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListRootWithFileAndPathFilterWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -340,7 +341,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListEmptyDirectory() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir3");
runProcessor();
@ -349,7 +350,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListNonExistingDirectory() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dummy");
runProcessor();
@ -358,8 +359,8 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithNonExistingFileSystem() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy");
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
@ -368,7 +369,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithRecords() throws InitializationException {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
@ -384,7 +385,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithRecordsWithTempFiles() throws InitializationException {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
@ -402,7 +403,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runProcessor();
@ -412,7 +413,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinAgeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -423,7 +424,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runProcessor();
@ -433,7 +434,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxAgeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -448,7 +449,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runProcessor();
@ -458,7 +459,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMinSizeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
@ -472,7 +473,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runProcessor();
@ -482,7 +483,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testListWithMaxSizeWithTempFiles() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");

View File

@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
@ -66,7 +67,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY);
runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME);
runner.setProperty(AzureStorageUtils.FILE, FILE_NAME);
}
@Test
@ -187,7 +188,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
public void testMoveFileWithInvalidFileName() {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1");
runner.setProperty(AzureStorageUtils.FILE, "/file1");
runProcessor(FILE_DATA);
@ -203,7 +204,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory);
runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory);
runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName);
runner.setProperty(AzureStorageUtils.FILE, fileName);
runProcessor(FILE_DATA);
@ -296,7 +297,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private void setELProperties() {
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY));
runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME));
runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}", EL_FILE_NAME));
}
private void runProcessor(byte[] fileData) {

View File

@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.fileresource.service.StandardFileResourceService;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -64,8 +65,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@BeforeEach
public void setUp() {
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, DIRECTORY);
runner.setProperty(PutAzureDataLakeStorage.FILE, FILE_NAME);
runner.setProperty(AzureStorageUtils.DIRECTORY, DIRECTORY);
runner.setProperty(AzureStorageUtils.FILE, FILE_NAME);
}
@Test
@ -121,7 +122,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
String baseDirectory = "dir1/dir2";
String fullDirectory = baseDirectory + "/dir3/dir4";
fileSystemClient.createDirectory(baseDirectory);
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, fullDirectory);
runner.setProperty(AzureStorageUtils.DIRECTORY, fullDirectory);
runProcessor(FILE_DATA);
@ -131,7 +132,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileToRootDirectory() throws Exception {
String rootDirectory = "";
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, rootDirectory);
runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory);
runProcessor(FILE_DATA);
@ -160,7 +161,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileWithNonExistingFileSystem() {
runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, "dummy");
runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
runProcessor(FILE_DATA);
@ -169,7 +170,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Test
public void testPutFileWithInvalidFileName() {
runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
runner.setProperty(AzureStorageUtils.FILE, "/file1");
runProcessor(FILE_DATA);
@ -180,8 +181,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception {
String directory = "dir 1";
String fileName = "file 1";
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, directory);
runner.setProperty(PutAzureDataLakeStorage.FILE, fileName);
runner.setProperty(AzureStorageUtils.DIRECTORY, directory);
runner.setProperty(AzureStorageUtils.FILE, fileName);
runProcessor(FILE_DATA);
@ -290,9 +291,9 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
private void setELProperties() {
runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, String.format("${%s}", EL_DIRECTORY));
runner.setProperty(PutAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME));
runner.setProperty(AzureStorageUtils.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM));
runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}", EL_DIRECTORY));
runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}", EL_FILE_NAME));
}
private void runProcessor(byte[] fileData) {

View File

@ -22,10 +22,10 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.specialized.BlobInputStream;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.azure.AzureCredentialsService;
import org.apache.nifi.services.azure.StandardAzureCredentialsControllerService;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class AzureBlobStorageFileResourceServiceTest {
private static final String CONTROLLER_SERVICE = "AzureCredentialsService";
private static final String CONTAINER = "container-name";
private static final String BLOB_NAME = "test-file";
private static final long CONTENT_LENGTH = 10L;
@Mock
private BlobServiceClient client;
@Mock
private BlobContainerClient containerClient;
@Mock
private BlobClient blobClient;
@Mock
private BlobProperties blobProperties;
@Mock
private BlobInputStream blobInputStream;
@InjectMocks
private TestAzureBlobStorageFileResourceService service;
private TestRunner runner;
@BeforeEach
void setup() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("AzureBlobStorageFileResourceService", service);
}
@Test
void testValidBlob() throws InitializationException {
setupService();
setupMocking(CONTAINER, BLOB_NAME);
final FileResource fileResource = service.getFileResource(Map.of());
assertFileResource(fileResource);
verifyMockInvocations(CONTAINER, BLOB_NAME);
}
@Test
void testValidBlobWithEL() throws InitializationException {
String customContainer = "custom-container";
String customBlobName = "custom-blob-name";
String blobKey = "blob.name";
String containerKey = "container.name";
setupService(String.format("${%s}", blobKey), String.format("${%s}", containerKey));
setupMocking(customContainer, customBlobName);
runner.setValidateExpressionUsage(false);
final FileResource fileResource = service.getFileResource(Map.of(
blobKey, customBlobName,
containerKey, customContainer));
assertFileResource(fileResource);
verifyMockInvocations(customContainer, customBlobName);
}
@Test
void testNonExistingBlob() throws InitializationException {
setupService();
when(client.getBlobContainerClient(CONTAINER)).thenReturn(containerClient);
when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient);
when(blobClient.exists()).thenReturn(false);
assertThrows(ProcessException.class,
() -> service.getFileResource(Map.of()),
"Failed to fetch blob from Azure Blob Storage");
}
@Test
void testELWithMissingAttribute() throws InitializationException {
runner.setValidateExpressionUsage(false);
setupService(String.format("${%s}", BLOB_NAME), String.format("${%s}", CONTAINER));
assertThrows(ProcessException.class,
() -> service.getFileResource(Map.of()),
"Container name and blob name cannot be empty");
}
private void setupService() throws InitializationException {
setupService(BLOB_NAME, CONTAINER);
}
private void setupService(String blobName, String container) throws InitializationException {
final AzureCredentialsService credentialsService = new StandardAzureCredentialsControllerService();
runner.addControllerService(CONTROLLER_SERVICE, credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(service, BLOB_STORAGE_CREDENTIALS_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(service, AzureBlobStorageFileResourceService.BLOB_NAME, blobName);
runner.setProperty(service, AzureBlobStorageFileResourceService.CONTAINER, container);
runner.enableControllerService(service);
}
private void setupMocking(String container, String blobName) {
when(client.getBlobContainerClient(container)).thenReturn(containerClient);
when(containerClient.getBlobClient(blobName)).thenReturn(blobClient);
when(blobClient.exists()).thenReturn(true);
when(blobClient.getProperties()).thenReturn(blobProperties);
when(blobProperties.getBlobSize()).thenReturn(CONTENT_LENGTH);
when(blobClient.openInputStream()).thenReturn(blobInputStream);
}
private void assertFileResource(FileResource fileResource) {
assertNotNull(fileResource);
assertEquals(fileResource.getInputStream(), blobInputStream);
assertEquals(fileResource.getSize(), CONTENT_LENGTH);
}
private void verifyMockInvocations(String customContainer, String customBlobName) {
verify(client).getBlobContainerClient(customContainer);
verify(containerClient).getBlobClient(customBlobName);
verify(blobClient).exists();
verify(blobClient).getProperties();
verify(blobProperties).getBlobSize();
verify(blobClient).openInputStream();
verifyNoMoreInteractions(containerClient, blobClient, blobProperties);
}
private static class TestAzureBlobStorageFileResourceService extends AzureBlobStorageFileResourceService {
private final BlobServiceClient client;
public TestAzureBlobStorageFileResourceService(BlobServiceClient client) {
this.client = client;
}
@Override
protected BlobServiceClient getStorageClient(Map<String, String> attributes) {
return client;
}
}
}

View File

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.PathProperties;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.InputStream;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class AzureDataLakeStorageFileResourceServiceTest {
private static final String CREDENTIALS_CONTROLLER_SERVICE = "ADLSCredentialsService";
private static final String FILE_SYSTEM = "filesystem-name";
private static final String DIRECTORY = "test-directory";
private static final String FILE = "test-file";
private static final long CONTENT_LENGTH = 10L;
public static final String MSG_EMPTY_FILE_NAME = "'File Name' property evaluated to blank string. 'File Name' must be specified as a non-blank string.";
public static final String MSG_EMPTY_FILE_SYSTEM_NAME = "'Filesystem Name' property evaluated to blank string. 'Filesystem Name' must be specified as a non-blank string.";
@Mock
private DataLakeServiceClient client;
@Mock
private DataLakeFileSystemClient fileSystemClient;
@Mock
private DataLakeDirectoryClient directoryClient;
@Mock
private DataLakeFileClient fileClient;
@Mock
private PathProperties properties;
@Mock
private InputStream inputStream;
@InjectMocks
private TestAzureDataLakeStorageFileResourceService service;
private TestRunner runner;
@BeforeEach
void setup() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("AzureDataLakeStorageFileResourceService", service);
}
@Test
void testHappyPath() throws InitializationException {
setupService();
setupMocking();
FileResource fileResource = service.getFileResource(Map.of());
assertFileResource(fileResource);
verifyMockInvocations();
}
@Test
void testHappyPathWithValidEL() throws InitializationException {
String fileSystemKey = "filesystem.name";
String directoryKey = "directory";
String fileNameKey = "filename";
setupService("${" + fileSystemKey + "}", "${" + directoryKey + "}", "${" + fileNameKey + "}");
setupMocking();
FileResource fileResource = service.getFileResource(Map.of(
fileSystemKey, FILE_SYSTEM,
directoryKey, DIRECTORY,
fileNameKey, FILE));
assertFileResource(fileResource);
verifyMockInvocations();
}
@Test
void testFileIsDirectory() throws InitializationException {
setupService();
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
when(fileClient.getProperties()).thenReturn(properties);
when(properties.isDirectory()).thenReturn(true);
executeAndAssertProcessException(Map.of(), "File Name (" + FILE + ") points to a directory. Full path: " + fileClient.getFilePath());
}
@Test
void testNonExistentFile() throws InitializationException {
setupService();
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
when(fileClient.getProperties()).thenReturn(properties);
when(properties.isDirectory()).thenReturn(false);
when(fileClient.exists()).thenReturn(false);
executeAndAssertProcessException(Map.of(), "File " + DIRECTORY + "/" + FILE + " not found in file system: " + FILE_SYSTEM);
}
@Test
void testInvalidDirectoryValueWithLeadingSlash() throws InitializationException {
String directoryKey = "directory.name";
String directoryValue = "/invalid-directory";
setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE);
executeAndAssertProcessException(Map.of(directoryKey, directoryValue), "'Directory Name' starts with '/'. 'Directory Name' cannot contain a leading '/'.");
}
@Test
void testValidELWithMissingFileValue() throws InitializationException {
setupService(FILE_SYSTEM, DIRECTORY, "${file.name}");
executeAndAssertProcessException(Map.of(), MSG_EMPTY_FILE_NAME);
}
@Test
void testInvalidFileSystem() throws InitializationException {
String fileSystemKey = "fileSystem";
String fileSystemValue = " ";
setupService("${" + fileSystemKey + "}", DIRECTORY, FILE);
executeAndAssertProcessException(Map.of(fileSystemKey, fileSystemValue), MSG_EMPTY_FILE_SYSTEM_NAME);
}
@Test
void testInvalidFileName() throws InitializationException {
String fileKey = "fileSystem";
String fileValue = " ";
setupService(FILE_SYSTEM, DIRECTORY, "${" + fileKey + "}");
executeAndAssertProcessException(Map.of(fileKey, fileValue),
MSG_EMPTY_FILE_NAME);
}
@Test
void testInvalidDirectoryValueWithWhiteSpaceOnly() throws InitializationException {
String directoryKey = "directory.name";
String directoryValue = " ";
setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE);
executeAndAssertProcessException(Map.of(directoryKey, directoryValue), "'Directory Name' contains whitespace characters only.");
}
private void setupService() throws InitializationException {
setupService(FILE_SYSTEM, DIRECTORY, FILE);
}
private void setupService(String fileSystem, String directory, String fileName) throws InitializationException {
final ADLSCredentialsService credentialsService = mock(ADLSCredentialsService.class);
when(credentialsService.getIdentifier()).thenReturn(CREDENTIALS_CONTROLLER_SERVICE);
runner.addControllerService(CREDENTIALS_CONTROLLER_SERVICE, credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(service, ADLS_CREDENTIALS_SERVICE, CREDENTIALS_CONTROLLER_SERVICE);
runner.setProperty(service, AzureStorageUtils.FILESYSTEM, fileSystem);
runner.setProperty(service, AzureStorageUtils.DIRECTORY, directory);
runner.setProperty(service, AzureStorageUtils.FILE, fileName);
runner.enableControllerService(service);
}
private void setupMocking() {
when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient);
when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient);
when(directoryClient.getFileClient(FILE)).thenReturn(fileClient);
when(fileClient.getProperties()).thenReturn(properties);
when(properties.isDirectory()).thenReturn(false);
when(fileClient.exists()).thenReturn(true);
when(properties.getFileSize()).thenReturn(CONTENT_LENGTH);
DataLakeFileOpenInputStreamResult result = mock(DataLakeFileOpenInputStreamResult.class);
when(fileClient.openInputStream()).thenReturn(result);
when(result.getInputStream()).thenReturn(inputStream);
}
private void executeAndAssertProcessException(Map<String, String> arguments, String expectedMessage) {
ProcessException exception = assertThrows(ProcessException.class,
() -> service.getFileResource(arguments));
assertEquals(expectedMessage, exception.getMessage());
}
private void assertFileResource(FileResource fileResource) {
assertNotNull(fileResource);
assertEquals(fileResource.getInputStream(), inputStream);
assertEquals(fileResource.getSize(), CONTENT_LENGTH);
}
private void verifyMockInvocations() {
verify(client).getFileSystemClient(FILE_SYSTEM);
verify(fileSystemClient).getDirectoryClient(DIRECTORY);
verify(directoryClient).getFileClient(FILE);
verify(properties).isDirectory();
verify(fileClient).exists();
verify(fileClient).openInputStream();
verify(properties).getFileSize();
}
private static class TestAzureDataLakeStorageFileResourceService extends AzureDataLakeStorageFileResourceService {
private final DataLakeServiceClient client;
private TestAzureDataLakeStorageFileResourceService(DataLakeServiceClient client) {
this.client = client;
}
@Override
protected DataLakeServiceClient getStorageClient(Map<String, String> attributes) {
return client;
}
}
}