mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 01:58:32 +00:00
NIFI-12123: This closes #7786. Added additional @UseCase and @MultiProcessorUseCase annotations as well as some trivial cleanup that was flagged by IntelliJ such as making inner class static, updating some deprecated references, etc.
Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
parent
bcc54953c4
commit
025109a9d8
@ -20,11 +20,20 @@ import com.azure.storage.blob.BlobClient;
|
||||
import com.azure.storage.blob.BlobContainerClient;
|
||||
import com.azure.storage.blob.BlobServiceClient;
|
||||
import com.azure.storage.blob.models.BlobRange;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
@ -40,15 +49,6 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
|
||||
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
|
||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
|
||||
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
|
||||
@ -81,6 +81,32 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
|
||||
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
|
||||
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
|
||||
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in an Azure Blob Storage container",
|
||||
keywords = {"azure", "blob", "storage", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListAzureBlobStorage_v12.class,
|
||||
configuration = """
|
||||
The "Container Name" property should be set to the name of the Blob Storage Container that files reside in. \
|
||||
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_CONTAINER}`.
|
||||
|
||||
The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchAzureBlobStorage_v12.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchAzureBlobStorage_v12.class,
|
||||
configuration = """
|
||||
"Container Name" = "${azure.container}"
|
||||
"Blob Name" = "${azure.blobname}"
|
||||
|
||||
The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container.
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
|
||||
|
||||
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
|
||||
|
@ -24,11 +24,17 @@ import com.azure.storage.file.datalake.DataLakeServiceClient;
|
||||
import com.azure.storage.file.datalake.models.DataLakeStorageException;
|
||||
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
|
||||
import com.azure.storage.file.datalake.models.FileRange;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
@ -42,20 +48,44 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
|
||||
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
|
||||
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
|
||||
@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage")
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "azure.datalake.storage.statusCode", description = "The HTTP error code (if available) from the failed operation"),
|
||||
@WritesAttribute(attribute = "azure.datalake.storage.errorCode", description = "The Azure Data Lake Storage moniker of the failed operation"),
|
||||
@WritesAttribute(attribute = "azure.datalake.storage.errorMessage", description = "The Azure Data Lake Storage error message from the failed operation")
|
||||
})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in an Azure DataLake Storage directory",
|
||||
keywords = {"azure", "datalake", "adls", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListAzureDataLakeStorage.class,
|
||||
configuration = """
|
||||
The "Filesystem Name" property should be set to the name of the Azure Filesystem (also known as a Container) that files reside in. \
|
||||
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_FILESYSTEM}`.
|
||||
Configure the "Directory Name" property to specify the name of the directory in the file system. \
|
||||
If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_DIRECTORY}`.
|
||||
|
||||
The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchAzureDataLakeStorage.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchAzureDataLakeStorage.class,
|
||||
configuration = """
|
||||
"Filesystem Name" = "${azure.filesystem}"
|
||||
"Directory Name" = "${azure.directory}"
|
||||
"File Name" = "${azure.filename}"
|
||||
|
||||
The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
|
||||
|
||||
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
|
||||
|
@ -20,11 +20,20 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException;
|
||||
import com.google.api.services.drive.Drive;
|
||||
import com.google.api.services.drive.DriveScopes;
|
||||
import com.google.api.services.drive.model.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
@ -43,17 +52,6 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
|
||||
import org.apache.nifi.processors.gcp.util.GoogleUtils;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
|
||||
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
|
||||
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
|
||||
@ -82,6 +80,33 @@ import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTA
|
||||
@WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
|
||||
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)
|
||||
})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in a Google Drive folder",
|
||||
keywords = {"google", "drive", "google cloud", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListGoogleDrive.class,
|
||||
configuration = """
|
||||
The "Folder ID" property should be set to the ID of the Google Drive folder that files reside in. \
|
||||
See processor documentation / additional details for more information on how to determine a Google Drive folder's ID.
|
||||
If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
|
||||
this property by setting it to something like `#{GOOGLE_DRIVE_FOLDER_ID}`.
|
||||
|
||||
The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the folder.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchGoogleDrive.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchGoogleDrive.class,
|
||||
configuration = """
|
||||
"File ID" = "${drive.id}"
|
||||
|
||||
The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {
|
||||
|
||||
// Google Docs Export Types
|
||||
@ -195,8 +220,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("A FlowFile will be routed here for each successfully fetched File.")
|
||||
@ -207,7 +230,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
||||
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
private static final List<PropertyDescriptor> PROPERTIES = List.of(
|
||||
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
FILE_ID,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS),
|
||||
@ -215,12 +238,9 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
||||
GOOGLE_SPREADSHEET_EXPORT_TYPE,
|
||||
GOOGLE_PRESENTATION_EXPORT_TYPE,
|
||||
GOOGLE_DRAWING_EXPORT_TYPE
|
||||
));
|
||||
);
|
||||
|
||||
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_SUCCESS,
|
||||
REL_FAILURE
|
||||
)));
|
||||
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
|
||||
|
||||
private volatile Drive driveService;
|
||||
|
||||
@ -279,20 +299,14 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (mimeType) {
|
||||
case "application/vnd.google-apps.document":
|
||||
return context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.spreadsheet":
|
||||
return context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.presentation":
|
||||
return context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.drawing":
|
||||
return context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.script":
|
||||
return "application/vnd.google-apps.script+json";
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
return switch (mimeType) {
|
||||
case "application/vnd.google-apps.document" -> context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.spreadsheet" -> context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.presentation" -> context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.drawing" -> context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue();
|
||||
case "application/vnd.google-apps.script" -> "application/vnd.google-apps.script+json";
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private FlowFile fetchFile(final String fileId, final ProcessSession session, final ProcessContext context, final FlowFile flowFile, final Map<String, String> attributeMap) throws IOException {
|
||||
|
@ -22,6 +22,14 @@ import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.commons.io.output.CountingOutputStream;
|
||||
@ -30,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
@ -45,15 +55,6 @@ import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
|
||||
@ -129,6 +130,33 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
|
||||
@WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC),
|
||||
@WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
|
||||
})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in a Google Compute Storage (GCS) bucket",
|
||||
keywords = {"gcp", "gcs", "google cloud", "google compute storage", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListGCSBucket.class,
|
||||
configuration = """
|
||||
The "Bucket" property should be set to the name of the GCS bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \
|
||||
this property by setting it to something like `#{GCS_SOURCE_BUCKET}`.
|
||||
Configure the "Project ID" property to reflect the ID of your Google Compute Cloud Project.
|
||||
|
||||
The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchGCSObject.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchGCSObject.class,
|
||||
configuration = """
|
||||
"Bucket" = "${gcs.bucket}"
|
||||
"Name" = "${filename}"
|
||||
|
||||
The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchGCSObject extends AbstractGCSProcessor {
|
||||
public static final PropertyDescriptor BUCKET = new PropertyDescriptor
|
||||
.Builder().name("gcs-bucket")
|
||||
@ -219,7 +247,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
||||
try {
|
||||
final FetchedBlob blob = fetchBlob(context, storage, attributes);
|
||||
|
||||
final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
|
||||
final CountingOutputStream out = new CountingOutputStream(NullOutputStream.INSTANCE);
|
||||
IOUtils.copy(blob.contents, out);
|
||||
final long byteCount = out.getByteCount();
|
||||
results.add(new ConfigVerificationResult.Builder()
|
||||
@ -253,9 +281,6 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
||||
|
||||
final Storage storage = getCloudService();
|
||||
|
||||
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
|
||||
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
|
||||
|
||||
try {
|
||||
final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes());
|
||||
flowFile = session.importFrom(blob.contents, flowFile);
|
||||
@ -328,7 +353,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
||||
return blobSourceOptions;
|
||||
}
|
||||
|
||||
private class FetchedBlob {
|
||||
private static class FetchedBlob {
|
||||
private final InputStream contents;
|
||||
private final Blob blob;
|
||||
|
||||
|
@ -25,16 +25,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||
import org.apache.nifi.processors.standard.util.FTPTransfer;
|
||||
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||
|
||||
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
|
||||
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"ftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
|
||||
@CapabilityDescription("Fetches the content of a file from a remote FTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
|
||||
@ -46,6 +49,36 @@ import org.apache.nifi.processors.standard.util.FTPTransfer;
|
||||
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
|
||||
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute")
|
||||
})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in a directory of an FTP Server",
|
||||
keywords = {"ftp", "file", "transform", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListFTP.class,
|
||||
configuration = """
|
||||
The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \
|
||||
this property by setting it to something like `#{FTP_SERVER}`.
|
||||
The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \
|
||||
it's a good idea to parameterize this property by setting it to something like `#{FTP_REMOTE_PATH}`.
|
||||
Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \
|
||||
by setting it to something like `#{FTP_USERNAME}`.
|
||||
Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \
|
||||
by setting it to something like `#{FTP_PASSWORD}`.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchFTP.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchFTP.class,
|
||||
configuration = """
|
||||
"Hostname" = "${ftp.remote.host}"
|
||||
"Remote File" = "${path}/${filename}"
|
||||
"Username" = "${ftp.listing.user}"
|
||||
"Password" = "#{FTP_PASSWORD}"
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchFTP extends FetchFileTransfer {
|
||||
|
||||
@Override
|
||||
|
@ -20,12 +20,13 @@ package org.apache.nifi.processors.standard;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
|
||||
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
@ -48,6 +49,36 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
|
||||
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
|
||||
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute")
|
||||
})
|
||||
@MultiProcessorUseCase(
|
||||
description = "Retrieve all files in a directory of an SFTP Server",
|
||||
keywords = {"sftp", "secure", "file", "transform", "state", "retrieve", "fetch", "all", "stream"},
|
||||
configurations = {
|
||||
@ProcessorConfiguration(
|
||||
processorClass = ListSFTP.class,
|
||||
configuration = """
|
||||
The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \
|
||||
this property by setting it to something like `#{SFTP_SERVER}`.
|
||||
The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \
|
||||
it's a good idea to parameterize this property by setting it to something like `#{SFTP_REMOTE_PATH}`.
|
||||
Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \
|
||||
by setting it to something like `#{SFTP_USERNAME}`.
|
||||
Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \
|
||||
by setting it to something like `#{SFTP_PASSWORD}`.
|
||||
|
||||
The 'success' Relationship of this Processor is then connected to FetchSFTP.
|
||||
"""
|
||||
),
|
||||
@ProcessorConfiguration(
|
||||
processorClass = FetchSFTP.class,
|
||||
configuration = """
|
||||
"Hostname" = "${sftp.remote.host}"
|
||||
"Remote File" = "${path}/${filename}"
|
||||
"Username" = "${sftp.listing.user}"
|
||||
"Password" = "#{SFTP_PASSWORD}"
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class FetchSFTP extends FetchFileTransfer {
|
||||
|
||||
@Override
|
||||
|
@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.documentation.UseCase;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
@ -58,10 +59,27 @@ import org.apache.nifi.serialization.record.Record;
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.")
|
||||
})
|
||||
@DynamicProperty(name = "(Ignored)", value = "A RecordPath to the field to be removed.",
|
||||
description = "Allows users to specify fields to remove that match the RecordPath.",
|
||||
@DynamicProperty(name = "A description of the field to remove",
|
||||
value = "A RecordPath to the field to be removed.",
|
||||
description = "Any field that matches the RecordPath set as the value will be removed.",
|
||||
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
@SeeAlso({UpdateRecord.class})
|
||||
@UseCase(
|
||||
description = "Remove one or more fields from a Record",
|
||||
keywords = {"record", "field", "drop", "remove", "delete", "expunge", "recordpath"},
|
||||
configuration = """
|
||||
Configure the Record Reader according to the incoming data format.
|
||||
Configure the Record Writer according to the desired output format.
|
||||
|
||||
For each field that you want to remove, add a single new property to the Processor.
|
||||
The name of the property can be anything but it's recommended to use a brief description of the field.
|
||||
The value of the property is a RecordPath that matches the field to remove.
|
||||
|
||||
For example, to remove the `name` and `email` fields, add two Properties:
|
||||
`name` = `/name`
|
||||
`email` = `/email`
|
||||
"""
|
||||
)
|
||||
public class RemoveRecordField extends AbstractRecordProcessor {
|
||||
private volatile RecordPathCache recordPathCache;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user