diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java new file mode 100644 index 0000000000..2460048b3f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.FetchFileTransfer; +import org.apache.nifi.processors.standard.GetFTP; +import org.apache.nifi.processors.standard.GetSFTP; +import org.apache.nifi.processors.standard.PutFTP; +import org.apache.nifi.processors.standard.PutSFTP; +import org.apache.nifi.processors.standard.util.FTPTransfer; + +// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted. +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"ftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) +@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") +@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "ftp.remote.host", description = "The hostname or IP address from which the file was pulled"), + @WritesAttribute(attribute = "ftp.remote.port", description = "The port that was used to communicate with the remote FTP server"), + @WritesAttribute(attribute = "ftp.remote.filename", description = "The name of the remote file that was pulled"), + @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), + @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") +}) +public class FetchFTP extends FetchFileTransfer { + + @Override + protected List getSupportedPropertyDescriptors() { + final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build(); + + final List properties = new ArrayList<>(); + properties.add(HOSTNAME); + properties.add(port); + properties.add(USERNAME); + properties.add(FTPTransfer.PASSWORD); + properties.add(REMOTE_FILENAME); + properties.add(COMPLETION_STRATEGY); + properties.add(MOVE_DESTINATION_DIR); + properties.add(FTPTransfer.CONNECTION_TIMEOUT); + properties.add(FTPTransfer.DATA_TIMEOUT); + properties.add(FTPTransfer.USE_COMPRESSION); + properties.add(FTPTransfer.CONNECTION_MODE); + properties.add(FTPTransfer.TRANSFER_MODE); + properties.add(FTPTransfer.PROXY_TYPE); + properties.add(FTPTransfer.PROXY_HOST); + properties.add(FTPTransfer.PROXY_PORT); + properties.add(FTPTransfer.HTTP_PROXY_USERNAME); + properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); + return properties; + } + + @Override + protected FileTransfer createFileTransfer(final ProcessContext context) { + return new FTPTransfer(context, getLogger()); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java new file mode 100644 index 0000000000..02468d3bb2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.ListFileTransfer; +import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.PutFTP; +import org.apache.nifi.processors.standard.GetFTP; + +@TriggerSerially +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"list", "ftp", "remote", "ingest", "source", "input", "files"}) +@CapabilityDescription("Performs a listing of the files residing on an FTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute " + + "set to the name of the file on the remote server. This can then be used in conjunction with FetchFTP in order to fetch those files.") +@SeeAlso({FetchFTP.class, GetFTP.class, PutFTP.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "ftp.remote.host", description = "The hostname of the FTP Server"), + @WritesAttribute(attribute = "ftp.remote.port", description = "The port that was connected to on the FTP Server"), + @WritesAttribute(attribute = "ftp.listing.user", description = "The username of the user that performed the FTP Listing"), + @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"), + @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"), + @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"), + @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"), + @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " + + "This allows the Processor to list only files that have been added or modified after " + + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if " + + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.") +public class ListFTP extends ListFileTransfer { + + @Override + protected List getSupportedPropertyDescriptors() { + final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build(); + + final List properties = new ArrayList<>(); + properties.add(HOSTNAME); + properties.add(port); + properties.add(USERNAME); + properties.add(FTPTransfer.PASSWORD); + properties.add(REMOTE_PATH); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(FTPTransfer.RECURSIVE_SEARCH); + properties.add(FTPTransfer.FILE_FILTER_REGEX); + properties.add(FTPTransfer.PATH_FILTER_REGEX); + properties.add(FTPTransfer.IGNORE_DOTTED_FILES); + properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE); + properties.add(FTPTransfer.CONNECTION_TIMEOUT); + properties.add(FTPTransfer.DATA_TIMEOUT); + properties.add(FTPTransfer.CONNECTION_MODE); + properties.add(FTPTransfer.TRANSFER_MODE); + properties.add(FTPTransfer.PROXY_TYPE); + properties.add(FTPTransfer.PROXY_HOST); + properties.add(FTPTransfer.PROXY_PORT); + properties.add(FTPTransfer.HTTP_PROXY_USERNAME); + properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); + return properties; + } + + @Override + protected FileTransfer getFileTransfer(final ProcessContext context) { + return new FTPTransfer(context, getLogger()); + } + + @Override + protected String getProtocolName() { + return "ftp"; + } + + @Override + protected Scope getStateScope(final ProcessContext context) { + // Use cluster scope so that component can be run on Primary Node Only and can still + // pick up where it left off, even if the Primary Node changes. + return Scope.CLUSTER; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 716e736a42..dc1f012e2d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -89,4 +89,6 @@ org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml org.apache.nifi.processors.standard.ValidateCsv org.apache.nifi.processors.standard.ExecuteSQL -org.apache.nifi.processors.standard.FetchDistributedMapCache \ No newline at end of file +org.apache.nifi.processors.standard.FetchDistributedMapCache +org.apache.nifi.processors.standard.ListFTP +org.apache.nifi.processors.standard.FetchFTP