diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java index 4d9db2d5b1..18ca4a6d0f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java @@ -20,11 +20,20 @@ import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobRange; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -40,15 +49,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.ClientSideEncryptionSupport; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER; @@ -81,6 +81,32 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG), @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) +@MultiProcessorUseCase( + description = "Retrieve all files in an Azure Blob Storage container", + keywords = {"azure", "blob", "storage", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListAzureBlobStorage_v12.class, + configuration = """ + The "Container Name" property should be set to the name of the Blob Storage Container that files reside in. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_CONTAINER}`. + + The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container. + + The 'success' Relationship of this Processor is then connected to FetchAzureBlobStorage_v12. + """ + ), + @ProcessorConfiguration( + processorClass = FetchAzureBlobStorage_v12.class, + configuration = """ + "Container Name" = "${azure.container}" + "Blob Name" = "${azure.blobname}" + + The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container. + """ + ) + } +) public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 5e8ebeb618..d977120d33 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -24,11 +24,17 @@ import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.models.DataLakeStorageException; import com.azure.storage.file.datalake.models.DownloadRetryOptions; import com.azure.storage.file.datalake.models.FileRange; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -42,20 +48,44 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) -@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage") +@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.datalake.storage.statusCode", description = "The HTTP error code (if available) from the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorCode", description = "The Azure Data Lake Storage moniker of the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorMessage", description = "The Azure Data Lake Storage error message from the failed operation") }) +@MultiProcessorUseCase( + description = "Retrieve all files in an Azure DataLake Storage directory", + keywords = {"azure", "datalake", "adls", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListAzureDataLakeStorage.class, + configuration = """ + The "Filesystem Name" property should be set to the name of the Azure Filesystem (also known as a Container) that files reside in. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_FILESYSTEM}`. + Configure the "Directory Name" property to specify the name of the directory in the file system. \ + If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_DIRECTORY}`. + + The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem. + + The 'success' Relationship of this Processor is then connected to FetchAzureDataLakeStorage. + """ + ), + @ProcessorConfiguration( + processorClass = FetchAzureDataLakeStorage.class, + configuration = """ + "Filesystem Name" = "${azure.filesystem}" + "Directory Name" = "${azure.directory}" + "File Name" = "${azure.filename}" + + The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem. + """ + ) + } +) public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java index bbdb555042..5cae9e97d9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java @@ -20,11 +20,20 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.services.drive.Drive; import com.google.api.services.drive.DriveScopes; import com.google.api.services.drive.model.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -43,17 +52,6 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.util.GoogleUtils; import org.apache.nifi.proxy.ProxyConfiguration; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; @@ -82,6 +80,33 @@ import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTA @WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC), @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC) }) +@MultiProcessorUseCase( + description = "Retrieve all files in a Google Drive folder", + keywords = {"google", "drive", "google cloud", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListGoogleDrive.class, + configuration = """ + The "Folder ID" property should be set to the ID of the Google Drive folder that files reside in. \ + See processor documentation / additional details for more information on how to determine a Google Drive folder's ID. + If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{GOOGLE_DRIVE_FOLDER_ID}`. + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the folder. + + The 'success' Relationship of this Processor is then connected to FetchGoogleDrive. + """ + ), + @ProcessorConfiguration( + processorClass = FetchGoogleDrive.class, + configuration = """ + "File ID" = "${drive.id}" + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + """ + ) + } +) public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait { // Google Docs Export Types @@ -195,8 +220,6 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr - - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile will be routed here for each successfully fetched File.") @@ -207,7 +230,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.") .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + private static final List PROPERTIES = List.of( GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, FILE_ID, ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS), @@ -215,12 +238,9 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr GOOGLE_SPREADSHEET_EXPORT_TYPE, GOOGLE_PRESENTATION_EXPORT_TYPE, GOOGLE_DRAWING_EXPORT_TYPE - )); + ); - public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + public static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); private volatile Drive driveService; @@ -279,20 +299,14 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr return null; } - switch (mimeType) { - case "application/vnd.google-apps.document": - return context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.spreadsheet": - return context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.presentation": - return context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.drawing": - return context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue(); - case "application/vnd.google-apps.script": - return "application/vnd.google-apps.script+json"; - default: - return null; - } + return switch (mimeType) { + case "application/vnd.google-apps.document" -> context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.spreadsheet" -> context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.presentation" -> context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.drawing" -> context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue(); + case "application/vnd.google-apps.script" -> "application/vnd.google-apps.script+json"; + default -> null; + }; } private FlowFile fetchFile(final String fileId, final ProcessSession session, final ProcessContext context, final FlowFile flowFile, final Map attributeMap) throws IOException { diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java index 50ca5bb6d7..ee9a6d6b5a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java @@ -22,6 +22,14 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.CountingOutputStream; @@ -30,6 +38,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.ConfigVerificationResult; @@ -45,15 +55,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.Channels; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC; import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR; @@ -129,6 +130,33 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC; @WritesAttribute(attribute = OWNER_TYPE_ATTR, description = OWNER_TYPE_DESC), @WritesAttribute(attribute = URI_ATTR, description = URI_DESC) }) +@MultiProcessorUseCase( + description = "Retrieve all files in a Google Compute Storage (GCS) bucket", + keywords = {"gcp", "gcs", "google cloud", "google compute storage", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListGCSBucket.class, + configuration = """ + The "Bucket" property should be set to the name of the GCS bucket that files reside in. If the flow being built is to be reused elsewhere, it's a good idea to parameterize \ + this property by setting it to something like `#{GCS_SOURCE_BUCKET}`. + Configure the "Project ID" property to reflect the ID of your Google Compute Cloud Project. + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + + The 'success' Relationship of this Processor is then connected to FetchGCSObject. + """ + ), + @ProcessorConfiguration( + processorClass = FetchGCSObject.class, + configuration = """ + "Bucket" = "${gcs.bucket}" + "Name" = "${filename}" + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + """ + ) + } +) public class FetchGCSObject extends AbstractGCSProcessor { public static final PropertyDescriptor BUCKET = new PropertyDescriptor .Builder().name("gcs-bucket") @@ -219,7 +247,7 @@ public class FetchGCSObject extends AbstractGCSProcessor { try { final FetchedBlob blob = fetchBlob(context, storage, attributes); - final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM); + final CountingOutputStream out = new CountingOutputStream(NullOutputStream.INSTANCE); IOUtils.copy(blob.contents, out); final long byteCount = out.getByteCount(); results.add(new ConfigVerificationResult.Builder() @@ -253,9 +281,6 @@ public class FetchGCSObject extends AbstractGCSProcessor { final Storage storage = getCloudService(); - final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); - final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); - try { final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes()); flowFile = session.importFrom(blob.contents, flowFile); @@ -328,7 +353,7 @@ public class FetchGCSObject extends AbstractGCSProcessor { return blobSourceOptions; } - private class FetchedBlob { + private static class FetchedBlob { private final InputStream contents; private final Blob blob; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index cf6df69d1e..d920cb1085 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -25,16 +25,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.util.FileTransfer; // Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted. + @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"ftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) @CapabilityDescription("Fetches the content of a file from a remote FTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") @@ -46,6 +49,36 @@ import org.apache.nifi.processors.standard.util.FTPTransfer; @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") }) +@MultiProcessorUseCase( + description = "Retrieve all files in a directory of an FTP Server", + keywords = {"ftp", "file", "transform", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListFTP.class, + configuration = """ + The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \ + this property by setting it to something like `#{FTP_SERVER}`. + The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \ + it's a good idea to parameterize this property by setting it to something like `#{FTP_REMOTE_PATH}`. + Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \ + by setting it to something like `#{FTP_USERNAME}`. + Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \ + by setting it to something like `#{FTP_PASSWORD}`. + + The 'success' Relationship of this Processor is then connected to FetchFTP. + """ + ), + @ProcessorConfiguration( + processorClass = FetchFTP.class, + configuration = """ + "Hostname" = "${ftp.remote.host}" + "Remote File" = "${path}/${filename}" + "Username" = "${ftp.listing.user}" + "Password" = "#{FTP_PASSWORD}" + """ + ) + } +) public class FetchFTP extends FetchFileTransfer { @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 69281e5695..9fde9114e4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -20,12 +20,13 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.MultiProcessorUseCase; +import org.apache.nifi.annotation.documentation.ProcessorConfiguration; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -48,6 +49,36 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") }) +@MultiProcessorUseCase( + description = "Retrieve all files in a directory of an SFTP Server", + keywords = {"sftp", "secure", "file", "transform", "state", "retrieve", "fetch", "all", "stream"}, + configurations = { + @ProcessorConfiguration( + processorClass = ListSFTP.class, + configuration = """ + The "Hostname" property should be set to the fully qualified hostname of the FTP Server. It's a good idea to parameterize \ + this property by setting it to something like `#{SFTP_SERVER}`. + The "Remote Path" property must be set to the directory on the FTP Server where the files reside. If the flow being built is to be reused elsewhere, \ + it's a good idea to parameterize this property by setting it to something like `#{SFTP_REMOTE_PATH}`. + Configure the "Username" property to the appropriate username for logging into the FTP Server. It's usually a good idea to parameterize this property \ + by setting it to something like `#{SFTP_USERNAME}`. + Configure the "Password" property to the appropriate password for the provided username. It's usually a good idea to parameterize this property \ + by setting it to something like `#{SFTP_PASSWORD}`. + + The 'success' Relationship of this Processor is then connected to FetchSFTP. + """ + ), + @ProcessorConfiguration( + processorClass = FetchSFTP.class, + configuration = """ + "Hostname" = "${sftp.remote.host}" + "Remote File" = "${path}/${filename}" + "Username" = "${sftp.listing.user}" + "Password" = "#{SFTP_PASSWORD}" + """ + ) + } +) public class FetchSFTP extends FetchFileTransfer { @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java index c1d38c4326..7b3b8df86f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RemoveRecordField.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -58,10 +59,27 @@ import org.apache.nifi.serialization.record.Record; @WritesAttributes({ @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader or Writer.") }) -@DynamicProperty(name = "(Ignored)", value = "A RecordPath to the field to be removed.", - description = "Allows users to specify fields to remove that match the RecordPath.", +@DynamicProperty(name = "A description of the field to remove", + value = "A RecordPath to the field to be removed.", + description = "Any field that matches the RecordPath set as the value will be removed.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @SeeAlso({UpdateRecord.class}) +@UseCase( + description = "Remove one or more fields from a Record", + keywords = {"record", "field", "drop", "remove", "delete", "expunge", "recordpath"}, + configuration = """ + Configure the Record Reader according to the incoming data format. + Configure the Record Writer according to the desired output format. + + For each field that you want to remove, add a single new property to the Processor. + The name of the property can be anything but it's recommended to use a brief description of the field. + The value of the property is a RecordPath that matches the field to remove. + + For example, to remove the `name` and `email` fields, add two Properties: + `name` = `/name` + `email` = `/email` + """ +) public class RemoveRecordField extends AbstractRecordProcessor { private volatile RecordPathCache recordPathCache;