From 266433e13d7885d44e2bbeb28da867d8477bd683 Mon Sep 17 00:00:00 2001
From: Peter Gyori
Date: Fri, 14 Aug 2020 21:42:02 +0200
Subject: [PATCH] NIFI-7624: ListenFTP processor
Signed-off-by: Pierre Villard
This closes #4481.
---
nifi-assembly/NOTICE | 13 +
.../src/main/resources/META-INF/LICENSE | 23 ++
.../src/main/resources/META-INF/NOTICE | 15 +
.../nifi-standard-processors/pom.xml | 18 +-
.../nifi/processors/standard/ListenFTP.java | 247 ++++++++++++++
.../processors/standard/ftp/FtpServer.java | 43 +++
.../standard/ftp/NifiFtpServer.java | 236 ++++++++++++++
.../ftp/commands/CommandMapFactory.java | 140 ++++++++
.../commands/DetailedFtpCommandException.java | 40 +++
.../ftp/commands/FtpCommandException.java | 36 +++
.../standard/ftp/commands/FtpCommandHELP.java | 147 +++++++++
.../standard/ftp/commands/FtpCommandSTOR.java | 230 +++++++++++++
.../ftp/commands/NotSupportedCommand.java | 41 +++
.../filesystem/DefaultVirtualFileSystem.java | 125 ++++++++
.../ftp/filesystem/VirtualFileSystem.java | 73 +++++
.../filesystem/VirtualFileSystemFactory.java | 39 +++
.../ftp/filesystem/VirtualFileSystemView.java | 79 +++++
.../ftp/filesystem/VirtualFtpFile.java | 169 ++++++++++
.../standard/ftp/filesystem/VirtualPath.java | 92 ++++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 82 +++++
.../standard/ftp/TestVirtualFileSystem.java | 187 +++++++++++
.../ftp/TestVirtualFileSystemView.java | 243 ++++++++++++++
.../standard/ftp/TestVirtualPath.java | 302 ++++++++++++++++++
24 files changed, 2615 insertions(+), 6 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/FtpServer.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/CommandMapFactory.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/DetailedFtpCommandException.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandException.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystem.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemFactory.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListenFTP/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystem.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualPath.java
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 112a2741b6..b6ee96c01e 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -427,6 +427,9 @@ The following binary components are provided under the Apache Software License v
Apache MINA Core
Copyright 2004-2011 Apache MINA Project
+ Apache MINA Core 2.0.16
+ Copyright 2004-2016 Apache MINA Project
+
(ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
(ASLv2) Apache Velocity
@@ -1933,6 +1936,16 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Copyright 2016 Jeroen van Erp
+ (ASLv2) Apache Ftplet API
+ The following NOTICE information applies:
+ Apache Ftplet API 1.1.1
+ Copyright 2003-2017 The Apache Software Foundation
+
+ (ASLv2) Apache FtpServer Core
+ The following NOTICE information applies:
+ Apache FtpServer Core 1.1.1
+ Copyright 2003-2017 The Apache Software Foundation
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
index 4988de17f3..7fd9a34cc6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE
@@ -354,4 +354,27 @@ This product bundles 'js-beautify' which is available under an MIT license.
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
+This product bundles 'SLF4J-API 1.7.21' which is available under an MIT license.
+
+ Copyright (c) 2004-2007 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index 7efc5c0746..9c9409d84f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -238,6 +238,21 @@ The following binary components are provided under the Apache Software License v
- Apache MINA SSHD
- Apache Commons-Net
+ (ASLv2) Apache Ftplet API
+ The following NOTICE information applies:
+ Apache Ftplet API 1.1.1
+ Copyright 2003-2017 The Apache Software Foundation
+
+ (ASLv2) Apache FtpServer Core
+ The following NOTICE information applies:
+ Apache FtpServer Core 1.1.1
+ Copyright 2003-2017 The Apache Software Foundation
+
+ (ASLv2) Apache MINA Core
+ The following NOTICE information applies:
+ Apache MINA Core 2.0.16
+ Copyright 2004-2016 Apache MINA Project
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 04deab9f61..231e8eb968 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -19,6 +19,12 @@
nifi-standard-processorsjar
+
+ org.apache.ftpserver
+ ftpserver-core
+ 1.1.1
+ compile
+ org.apache.nifinifi-api
@@ -296,12 +302,6 @@
MockFtpServertest
-
- org.apache.mina
- mina-core
- 2.0.19
- test
- org.apache.sshdsshd-core
@@ -390,6 +390,12 @@
1.13.0-SNAPSHOTcompile
+
+ org.hamcrest
+ hamcrest-all
+ 1.3
+ test
+
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
new file mode 100644
index 0000000000..bce6ac47b0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.FtpServer;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "FTP", "FTPS", "listen"})
+@CapabilityDescription("Starts an FTP server that listens on the specified port and transforms incoming files into FlowFiles. "
+ + "The URI of the service will be ftp://{hostname}:{port}. The default port is 2221.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The name of the file received via the FTP/FTPS connection."),
+ @WritesAttribute(attribute = "path", description = "The path pointing to the file's target directory. "
+ + "E.g.: file.txt is uploaded to /Folder1/SubFolder, then the value of the path attribute will be \"/Folder1/SubFolder/\" "
+ + "(note that it ends with a separator character).")
+})
+@SeeAlso(classNames = {"org.apache.nifi.ssl.StandardRestrictedSSLContextService","org.apache.nifi.ssl.StandardSSLContextService"})
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("Specifies the SSL Context Service that can be used to create secure connections. "
+ + "If an SSL Context Service is selected, then a keystore file must also be specified in the SSL Context Service. "
+ + "Without a keystore file, the processor cannot be started successfully."
+ + "Specifying a truststore file is optional. If a truststore file is specified, client authentication is required "
+ + "(the client needs to send a certificate to the server)."
+ + "Regardless of the selected TLS protocol, the highest available protocol is used for the connection. "
+ + "For example if NiFi is running on Java 11 and TLSv1.2 is selected in the controller service as the "
+ + "preferred TLS Protocol, TLSv1.3 will be used (regardless of TLSv1.2 being selected) because Java 11 "
+ + "supports TLSv1.3.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Relationship for successfully received files.")
+ .build();
+
+ public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+ .name("bind-address")
+ .displayName("Bind Address")
+ .description("The address the FTP server should be bound to. If not set (or set to 0.0.0.0), "
+ + "the server binds to all available addresses (i.e. all network interfaces of the host machine).")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("listening-port")
+ .displayName("Listening Port")
+ .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+ .required(true)
+ .defaultValue("2221")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+ .name("username")
+ .displayName("Username")
+ .description("The name of the user that is allowed to log in to the FTP server. "
+ + "If a username is provided, a password must also be provided. "
+ + "If no username is specified, anonymous connections will be permitted.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+ .name("password")
+ .displayName("Password")
+ .description("If the Username is set, then a password must also be specified. "
+ + "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .sensitive(true)
+ .build();
+
+ private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ BIND_ADDRESS,
+ PORT,
+ USERNAME,
+ PASSWORD,
+ SSL_CONTEXT_SERVICE
+ ));
+
+ private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+ RELATIONSHIP_SUCCESS
+ )));
+
+ private volatile FtpServer ftpServer;
+ private volatile CountDownLatch sessionFactorySetSignal;
+ private final AtomicReference sessionFactory = new AtomicReference<>();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @OnScheduled
+ public void startFtpServer(ProcessContext context) {
+ if (ftpServer == null) {
+ sessionFactory.set(null);
+
+ String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+ String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+ int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ try {
+ sessionFactorySetSignal = new CountDownLatch(1);
+ ftpServer = new NifiFtpServer.Builder()
+ .sessionFactory(sessionFactory)
+ .sessionFactorySetSignal(sessionFactorySetSignal)
+ .relationshipSuccess(RELATIONSHIP_SUCCESS)
+ .bindAddress(bindAddress)
+ .port(port)
+ .username(username)
+ .password(password)
+ .sslContextService(sslContextService)
+ .build();
+ ftpServer.start();
+ } catch (ProcessException processException) {
+ getLogger().error(processException.getMessage(), processException);
+ stopFtpServer();
+ throw processException;
+ }
+ } else {
+ getLogger().warn("Ftp server already started.");
+ }
+ }
+
+ @OnStopped
+ public void stopFtpServer() {
+ if (ftpServer != null && !ftpServer.isStopped()) {
+ ftpServer.stop();
+ }
+ ftpServer = null;
+ sessionFactory.set(null);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+ sessionFactorySetSignal.countDown();
+ }
+ context.yield();
+ }
+
+ @Override
+ protected Collection customValidate(ValidationContext context) {
+ List results = new ArrayList<>(3);
+
+ validateUsernameAndPassword(context, results);
+ validateBindAddress(context, results);
+
+ return results;
+ }
+
+ private void validateUsernameAndPassword(ValidationContext context, Collection validationResults) {
+ String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+ String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+ if ((username == null) && (password != null)) {
+ validationResults.add(usernameOrPasswordIsNull(USERNAME));
+ } else if ((username != null) && (password == null)) {
+ validationResults.add(usernameOrPasswordIsNull(PASSWORD));
+ }
+ }
+
+ private void validateBindAddress(ValidationContext context, Collection validationResults) {
+ String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+ try {
+ InetAddress.getByName(bindAddress);
+ } catch (UnknownHostException e) {
+ String explanation = String.format("'%s' is unknown", BIND_ADDRESS.getDisplayName());
+ validationResults.add(createValidationResult(BIND_ADDRESS.getDisplayName(), explanation));
+ }
+ }
+
+ private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty) {
+ String explanation = String.format("'%s' and '%s' should either both be provided or none of them", USERNAME.getDisplayName(), PASSWORD.getDisplayName());
+ return createValidationResult(nullProperty.getDisplayName(), explanation);
+ }
+
+ private ValidationResult createValidationResult(String subject, String explanation) {
+ return new ValidationResult.Builder().subject(subject).valid(false).explanation(explanation).build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/FtpServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/FtpServer.java
new file mode 100644
index 0000000000..c0b010d7d2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/FtpServer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * The standard interface for FTP server implementations.
+ */
+public interface FtpServer {
+
+ /**
+ * Starts the FTP server.
+ * @throws ProcessException if the server could not be started.
+ */
+ void start() throws ProcessException;
+
+ /**
+ * Stops the FTP server.
+ */
+ void stop();
+
+ /**
+ * Returns the status of the FTP server.
+ * @return true if the server is stopped, false otherwise.
+ */
+ boolean isStopped();
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
new file mode 100644
index 0000000000..e83bbd5962
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.DataConnectionConfiguration;
+import org.apache.ftpserver.DataConnectionConfigurationFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.ssl.SslConfiguration;
+import org.apache.ftpserver.ssl.SslConfigurationFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.CommandMapFactory;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer implements org.apache.nifi.processors.standard.ftp.FtpServer {
+
+ private final FtpServer server;
+
+ private NifiFtpServer(Map commandMap, FileSystemFactory fileSystemFactory, ConnectionConfig connectionConfig, Listener listener, User user) throws ProcessException {
+ try {
+ FtpServerFactory serverFactory = new FtpServerFactory();
+
+ serverFactory.setFileSystem(fileSystemFactory);
+ serverFactory.setCommandFactory(createCommandFactory(commandMap));
+ serverFactory.setConnectionConfig(connectionConfig);
+ serverFactory.addListener("default", listener);
+ serverFactory.getUserManager().save(user);
+
+ server = serverFactory.createServer();
+ } catch (Exception exception) {
+ throw new ProcessException("FTP server could not be started.", exception);
+ }
+ }
+
+ private CommandFactory createCommandFactory(Map commandMap) {
+ CommandFactoryFactory commandFactoryFactory = new CommandFactoryFactory();
+ commandFactoryFactory.setUseDefaultCommands(false);
+ commandFactoryFactory.setCommandMap(commandMap);
+ return commandFactoryFactory.createCommandFactory();
+ }
+
+ public void start() throws ProcessException {
+ try {
+ server.start();
+ } catch (Exception exception) {
+ throw new ProcessException("FTP server could not be started.", exception);
+ }
+ }
+
+ public void stop() {
+ server.stop();
+ }
+
+ public boolean isStopped() {
+ return server.isStopped();
+ }
+
+ public static class Builder {
+ private static final String HOME_DIRECTORY = "/virtual/ftproot";
+
+ private AtomicReference sessionFactory;
+ private CountDownLatch sessionFactorySetSignal;
+ private Relationship relationshipSuccess;
+ private String bindAddress;
+ private int port;
+ private String username;
+ private String password;
+ private SSLContextService sslContextService;
+
+ public Builder sessionFactory(AtomicReference sessionFactory) {
+ this.sessionFactory = sessionFactory;
+ return this;
+ }
+
+ public Builder sessionFactorySetSignal(CountDownLatch sessionFactorySetSignal) {
+ Objects.requireNonNull(sessionFactorySetSignal);
+
+ this.sessionFactorySetSignal = sessionFactorySetSignal;
+ return this;
+ }
+
+ public Builder relationshipSuccess(Relationship relationship) {
+ Objects.requireNonNull(relationship);
+
+ this.relationshipSuccess = relationship;
+ return this;
+ }
+
+ public Builder bindAddress(String bindAddress) {
+ this.bindAddress = bindAddress;
+ return this;
+ }
+
+ public Builder port(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Builder password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public Builder sslContextService(SSLContextService sslContextService) {
+ this.sslContextService = sslContextService;
+ return this;
+ }
+
+ public NifiFtpServer build() throws ProcessException {
+ try {
+ boolean anonymousLoginEnabled = (username == null);
+
+ FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory();
+ CommandMapFactory commandMapFactory = new CommandMapFactory(sessionFactory, sessionFactorySetSignal, relationshipSuccess);
+ Map commandMap = commandMapFactory.createCommandMap();
+ ConnectionConfig connectionConfig = createConnectionConfig(anonymousLoginEnabled);
+ Listener listener = createListener(bindAddress, port, sslContextService);
+ User user = createUser(username, password, HOME_DIRECTORY);
+
+ return new NifiFtpServer(commandMap, fileSystemFactory, connectionConfig, listener, user);
+ } catch (Exception exception) {
+ throw new ProcessException("FTP server could not be started.", exception);
+ }
+ }
+
+ private ConnectionConfig createConnectionConfig(boolean anonymousLoginEnabled) {
+ ConnectionConfigFactory connectionConfigFactory = new ConnectionConfigFactory();
+ connectionConfigFactory.setAnonymousLoginEnabled(anonymousLoginEnabled);
+ return connectionConfigFactory.createConnectionConfig();
+ }
+
+ private Listener createListener(String bindAddress, int port, SSLContextService sslContextService) throws FtpServerConfigurationException {
+ ListenerFactory listenerFactory = new ListenerFactory();
+ listenerFactory.setServerAddress(bindAddress);
+ listenerFactory.setPort(port);
+ if (sslContextService != null) {
+ SslConfigurationFactory ssl = new SslConfigurationFactory();
+ ssl.setKeystoreFile(new File(sslContextService.getKeyStoreFile()));
+ ssl.setKeystorePassword(sslContextService.getKeyStorePassword());
+ ssl.setKeyPassword(sslContextService.getKeyPassword());
+ ssl.setKeystoreType(sslContextService.getKeyStoreType());
+ ssl.setSslProtocol(sslContextService.getSslAlgorithm());
+
+ if (sslContextService.getTrustStoreFile() != null){
+ ssl.setClientAuthentication("NEED");
+ ssl.setTruststoreFile(new File(sslContextService.getTrustStoreFile()));
+ ssl.setTruststorePassword(sslContextService.getTrustStorePassword());
+ ssl.setTruststoreType(sslContextService.getTrustStoreType());
+ }
+
+ SslConfiguration sslConfiguration = ssl.createSslConfiguration();
+
+ // Set implicit security for the control socket
+ listenerFactory.setSslConfiguration(sslConfiguration);
+ listenerFactory.setImplicitSsl(true);
+
+ // Set implicit security for the data connection
+ DataConnectionConfigurationFactory dataConnectionConfigurationFactory = new DataConnectionConfigurationFactory();
+ dataConnectionConfigurationFactory.setImplicitSsl(true);
+ dataConnectionConfigurationFactory.setSslConfiguration(sslConfiguration);
+ DataConnectionConfiguration dataConnectionConfiguration = dataConnectionConfigurationFactory.createDataConnectionConfiguration();
+ listenerFactory.setDataConnectionConfiguration(dataConnectionConfiguration);
+ }
+ return listenerFactory.createListener();
+ }
+
+ private User createUser(String username, String password, String homeDirectory) {
+ boolean anonymousLoginEnabled = (username == null);
+ if (anonymousLoginEnabled) {
+ return createAnonymousUser(homeDirectory, Collections.singletonList(new WritePermission()));
+ } else {
+ return createNamedUser(username, password, homeDirectory, Collections.singletonList(new WritePermission()));
+ }
+ }
+
+ private User createAnonymousUser(String homeDirectory, List authorities) {
+ BaseUser user = new BaseUser();
+ user.setName("anonymous");
+ user.setHomeDirectory(homeDirectory);
+ user.setAuthorities(authorities);
+ return user;
+ }
+
+ private User createNamedUser(String username, String password, String homeDirectory, List authorities) {
+ BaseUser user = new BaseUser();
+ user.setName(username);
+ user.setPassword(password);
+ user.setHomeDirectory(homeDirectory);
+ user.setAuthorities(authorities);
+ return user;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/CommandMapFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/CommandMapFactory.java
new file mode 100644
index 0000000000..da36bbdd46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/CommandMapFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CommandMapFactory {
+
+ private final Map commandMap = new HashMap<>();
+ private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+ private final AtomicReference sessionFactory;
+ private final CountDownLatch sessionFactorySetSignal;
+ private final Relationship relationshipSuccess;
+
+ public CommandMapFactory(AtomicReference sessionFactory, CountDownLatch sessionFactorySetSignal, Relationship relationshipSuccess) {
+ this.sessionFactory = sessionFactory;
+ this.sessionFactorySetSignal = sessionFactorySetSignal;
+ this.relationshipSuccess = relationshipSuccess;
+ }
+
+ public Map createCommandMap() {
+ addToCommandMap("ABOR", new ABOR());
+ addToCommandMap("ACCT", new NotSupportedCommand("Operation (ACCT) not supported."));
+ addToCommandMap("APPE", new NotSupportedCommand("Operation (APPE) not supported."));
+ addToCommandMap("AUTH", new AUTH());
+ addToCommandMap("CDUP", new CDUP());
+ addToCommandMap("CWD", new CWD());
+ addToCommandMap("DELE", new NotSupportedCommand("Operation (DELE) not supported."));
+ addToCommandMap("EPRT", new EPRT());
+ addToCommandMap("EPSV", new EPSV());
+ addToCommandMap("FEAT", new FEAT());
+ addToCommandMap("HELP", customHelpCommand);
+ addToCommandMap("LIST", new LIST());
+ addToCommandMap("MFMT", new NotSupportedCommand("Operation (MFMT) not supported."));
+ addToCommandMap("MDTM", new MDTM());
+ addToCommandMap("MLST", new MLST());
+ addToCommandMap("MKD", new MKD());
+ addToCommandMap("MLSD", new MLSD());
+ addToCommandMap("MODE", new MODE());
+ addToCommandMap("NLST", new NLST());
+ addToCommandMap("NOOP", new NOOP());
+ addToCommandMap("OPTS", new OPTS());
+ addToCommandMap("PASS", new PASS());
+ addToCommandMap("PASV", new PASV());
+ addToCommandMap("PBSZ", new PBSZ());
+ addToCommandMap("PORT", new PORT());
+ addToCommandMap("PROT", new PROT());
+ addToCommandMap("PWD", new PWD());
+ addToCommandMap("QUIT", new QUIT());
+ addToCommandMap("REIN", new REIN());
+ addToCommandMap("REST", new NotSupportedCommand("Operation (REST) not supported."));
+ addToCommandMap("RETR", new NotSupportedCommand("Operation (RETR) not supported."));
+ addToCommandMap("RMD", new RMD());
+ addToCommandMap("RNFR", new NotSupportedCommand("Operation (RNFR) not supported."));
+ addToCommandMap("RNTO", new NotSupportedCommand("Operation (RNTO) not supported."));
+ addToCommandMap("SITE", new SITE());
+ addToCommandMap("SIZE", new SIZE());
+ addToCommandMap("SITE_DESCUSER", new SITE_DESCUSER());
+ addToCommandMap("SITE_HELP", new SITE_HELP());
+ addToCommandMap("SITE_STAT", new SITE_STAT());
+ addToCommandMap("SITE_WHO", new SITE_WHO());
+ addToCommandMap("SITE_ZONE", new SITE_ZONE());
+
+ addToCommandMap("STAT", new STAT());
+ addToCommandMap("STOR", new FtpCommandSTOR(sessionFactory, sessionFactorySetSignal, relationshipSuccess));
+ addToCommandMap("STOU", new NotSupportedCommand("Operation (STOU) not supported."));
+ addToCommandMap("STRU", new STRU());
+ addToCommandMap("SYST", new SYST());
+ addToCommandMap("TYPE", new TYPE());
+ addToCommandMap("USER", new USER());
+
+ return commandMap;
+ }
+
+ private void addToCommandMap(String command, Command instance) {
+ commandMap.put(command, instance);
+ if (!(instance instanceof NotSupportedCommand)) {
+ customHelpCommand.addCommand(command);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/DetailedFtpCommandException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/DetailedFtpCommandException.java
new file mode 100644
index 0000000000..f65b39ce8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/DetailedFtpCommandException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+public class DetailedFtpCommandException extends FtpCommandException {
+
+ private final String subId;
+ private final FtpFile ftpFile;
+
+ public DetailedFtpCommandException(int ftpReturnCode, String subId, String basicMessage, FtpFile ftpFile) {
+ super(ftpReturnCode, basicMessage);
+ this.subId = subId;
+ this.ftpFile = ftpFile;
+ }
+
+ public String getSubId() {
+ return subId;
+ }
+
+ public FtpFile getFtpFile() {
+ return ftpFile;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandException.java
new file mode 100644
index 0000000000..6213250d02
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+public class FtpCommandException extends Exception {
+
+ private final int ftpReturnCode;
+
+ public FtpCommandException(int ftpReturnCode, String basicMessage) {
+ super(basicMessage);
+ this.ftpReturnCode = ftpReturnCode;
+ }
+
+ public int getFtpReturnCode() {
+ return ftpReturnCode;
+ }
+
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
new file mode 100644
index 0000000000..98c4c0469b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FtpCommandHELP extends AbstractCommand {
+
+ private static Map COMMAND_SPECIFIC_HELP;
+ private static int MAX_NUMBER_OF_COMMANDS_IN_A_ROW = 5;
+ private Set availableCommands = new TreeSet<>();
+
+ static {
+ Map commands = new HashMap<>();
+ commands.put("ABOR", "Syntax: ABOR");
+ commands.put("APPE", "Syntax: APPE ");
+ commands.put("AUTH", "Syntax: AUTH ");
+ commands.put("CDUP", "Syntax: CDUP");
+ commands.put("CWD", "Syntax: CWD ");
+ commands.put("DELE", "Syntax: DELE ");
+ commands.put("EPRT", "Syntax: EPRT");
+ commands.put("EPSV", "Syntax: EPSV");
+ commands.put("FEAT", "Syntax: FEAT");
+ commands.put("HELP", "Syntax: HELP []");
+ commands.put("LIST", "Syntax: LIST []");
+ commands.put("MDTM", "Syntax: MDTM ");
+ commands.put("MKD", "Syntax: MKD ");
+ commands.put("MLSD", "Syntax: MLSD []");
+ commands.put("MLST", "Syntax: MLST []");
+ commands.put("MODE", "Syntax: MODE ");
+ commands.put("NLST", "Syntax: NLST []");
+ commands.put("NOOP", "Syntax: NOOP");
+ commands.put("OPTS", "Syntax: OPTS ");
+ commands.put("PASS", "Syntax: PASS ");
+ commands.put("PASV", "Syntax: PASV");
+ commands.put("PBSZ", "Syntax: PBSZ ");
+ commands.put("PORT", "Syntax: PORT ");
+ commands.put("PROT", "Syntax: PROT ");
+ commands.put("PWD", "Syntax: PWD");
+ commands.put("QUIT", "Syntax: QUIT");
+ commands.put("REIN", "Syntax: REIN");
+ commands.put("REST", "Syntax: REST ");
+ commands.put("RETR", "Syntax: RETR ");
+ commands.put("RMD", "Syntax: RMD ");
+ commands.put("RNFR", "Syntax: RNFR ");
+ commands.put("RNTO", "Syntax: RNTO ");
+ commands.put("SITE", "Syntax: SITE ");
+ commands.put("SIZE", "Syntax: SIZE ");
+ commands.put("STAT", "Syntax: STAT []");
+ commands.put("STOR", "Syntax: STOR ");
+ commands.put("STOU", "Syntax: STOU");
+ commands.put("SYST", "Syntax: SYST");
+ commands.put("TYPE", "Syntax: TYPE ");
+ commands.put("USER", "Syntax: USER ");
+ COMMAND_SPECIFIC_HELP = Collections.unmodifiableMap(commands);
+ }
+
+ public void addCommand(String command) {
+ if (!command.startsWith("SITE_")) { // Parameterized commands of SITE will not appear in the general help.
+ availableCommands.add(command);
+ }
+ }
+
+ public void execute(final FtpIoSession session,
+ final FtpServerContext context, final FtpRequest request) {
+
+ // reset state variables
+ session.resetState();
+
+ if (!request.hasArgument()) {
+ sendDefaultHelpMessage(session);
+ } else {
+ handleRequestWithArgument(session, request);
+ }
+ }
+
+ private void sendDefaultHelpMessage(FtpIoSession session) {
+ sendCustomHelpMessage(session, getDefaultHelpMessage());
+ }
+
+ private String getDefaultHelpMessage() {
+ StringBuffer helpMessage = new StringBuffer("The following commands are supported.\n");
+ int currentNumberOfCommandsInARow = 0;
+ Iterator iterator = availableCommands.iterator();
+ while (iterator.hasNext()) {
+ String command = iterator.next();
+ if (currentNumberOfCommandsInARow == MAX_NUMBER_OF_COMMANDS_IN_A_ROW) {
+ helpMessage.append("\n");
+ currentNumberOfCommandsInARow = 0;
+ }
+ if (iterator.hasNext()) {
+ helpMessage.append(command + ", ");
+ } else {
+ helpMessage.append(command);
+ }
+ ++currentNumberOfCommandsInARow;
+ }
+ helpMessage.append("\nEnd of help.");
+ return helpMessage.toString();
+ }
+
+ private void sendCustomHelpMessage(FtpIoSession session, String message) {
+ session.write(new DefaultFtpReply(FtpReply.REPLY_214_HELP_MESSAGE, message));
+ }
+
+ private void handleRequestWithArgument(FtpIoSession session, FtpRequest request) {
+ // Send command-specific help if available
+ String ftpCommand = request.getArgument().toUpperCase();
+ String commandSpecificHelp = null;
+
+ if (availableCommands.contains(ftpCommand)) {
+ commandSpecificHelp = COMMAND_SPECIFIC_HELP.get(ftpCommand);
+ }
+
+ if (commandSpecificHelp == null) {
+ sendDefaultHelpMessage(session);
+ } else {
+ sendCustomHelpMessage(session, commandSpecificHelp);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
new file mode 100644
index 0000000000..10cb7689d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+ private final AtomicReference sessionFactory;
+ private final CountDownLatch sessionFactorySetSignal;
+ private final Relationship relationshipSuccess;
+
+ public FtpCommandSTOR(AtomicReference sessionFactory, CountDownLatch sessionFactorySetSignal, Relationship relationshipSuccess) {
+ this.sessionFactory = sessionFactory;
+ this.sessionFactorySetSignal = sessionFactorySetSignal;
+ this.relationshipSuccess = relationshipSuccess;
+ }
+
+ public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+ try {
+ executeCommand(ftpSession, context, request);
+ } catch (DetailedFtpCommandException ftpCommandException) {
+ ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+ ftpCommandException.getFtpReturnCode(),
+ ftpCommandException.getSubId(),
+ ftpCommandException.getMessage(),
+ ftpCommandException.getFtpFile()));
+ } catch (FtpCommandException ftpCommandException) {
+ ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getMessage()));
+ } finally {
+ ftpSession.resetState();
+ ftpSession.getDataConnection().closeDataConnection();
+ }
+ }
+
+ private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+ throws FtpCommandException {
+
+ final String fileName = getArgument(request);
+
+ checkDataConnection(ftpSession);
+
+ final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+ checkWritePermission(ftpFile);
+
+ sendFileStatusOkay(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+ final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+ transferData(dataConnection, ftpSession, context, request, ftpFile);
+ }
+
+ private String getArgument(final FtpRequest request) throws FtpCommandException {
+ final String argument = request.getArgument();
+ if (argument == null) {
+ throw new DetailedFtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);
+ }
+ return argument;
+ }
+
+ private void checkDataConnection(final FtpIoSession ftpSession) throws FtpCommandException {
+ DataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
+ if (dataConnectionFactory instanceof IODataConnectionFactory) {
+ InetAddress address = ((IODataConnectionFactory) dataConnectionFactory)
+ .getInetAddress();
+ if (address == null) {
+ throw new FtpCommandException(FtpReply.REPLY_503_BAD_SEQUENCE_OF_COMMANDS, "PORT or PASV must be issued first");
+ }
+ }
+ }
+
+ private FtpFile getFtpFile(final FtpIoSession ftpSession, final String fileName) throws FtpCommandException {
+ FtpFile ftpFile = null;
+ try {
+ ftpFile = ftpSession.getFileSystemView().getFile(fileName);
+ } catch (FtpException e) {
+ LOG.error("Exception getting file object", e);
+ }
+ if (ftpFile == null) {
+ throw new DetailedFtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.invalid", fileName, ftpFile);
+ }
+ return ftpFile;
+ }
+
+ private void checkWritePermission(final FtpFile ftpFile) throws FtpCommandException {
+ if (!ftpFile.isWritable()) {
+ throw new DetailedFtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.permission", ftpFile.getAbsolutePath(), ftpFile);
+ }
+ }
+
+ private void sendFileStatusOkay(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request, final String fileAbsolutePath) {
+ ftpSession.write(LocalizedFtpReply.translate(ftpSession, request, context,
+ FtpReply.REPLY_150_FILE_STATUS_OKAY,
+ "STOR",
+ fileAbsolutePath)).awaitUninterruptibly(10000);
+ }
+
+ private DataConnection openDataConnection(final FtpIoSession ftpSession, final FtpFile ftpFile) throws FtpCommandException {
+ final DataConnection dataConnection;
+ try {
+ dataConnection = ftpSession.getDataConnection().openConnection();
+ } catch (Exception exception) {
+ LOG.error("Exception getting the input data stream", exception);
+ throw new DetailedFtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION,
+ "STOR",
+ ftpFile.getAbsolutePath(),
+ ftpFile);
+ }
+ return dataConnection;
+ }
+
+ private void transferData(final DataConnection dataConnection, final FtpIoSession ftpSession,
+ final FtpServerContext context, final FtpRequest request, final FtpFile ftpFile)
+ throws FtpCommandException {
+
+ final ProcessSession processSession;
+ try {
+ processSession = createProcessSession();
+ } catch (InterruptedException|TimeoutException exception) {
+ LOG.error("ProcessSession could not be acquired, command STOR aborted.", exception);
+ throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION, "File transfer failed.");
+ }
+ FlowFile flowFile = processSession.create();
+ long transferredBytes = 0L;
+ try (OutputStream flowFileOutputStream = processSession.write(flowFile)) {
+ transferredBytes = dataConnection.transferFromClient(ftpSession.getFtpletSession(), flowFileOutputStream);
+ LOG.info("File received {}", ftpFile.getAbsolutePath());
+ } catch (SocketException socketException) {
+ LOG.error("Socket exception during data transfer", socketException);
+ processSession.rollback();
+ throw new DetailedFtpCommandException(FtpReply.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
+ "STOR",
+ ftpFile.getAbsolutePath(),
+ ftpFile);
+ } catch (IOException ioException) {
+ LOG.error("IOException during data transfer", ioException);
+ processSession.rollback();
+ throw new DetailedFtpCommandException(FtpReply.REPLY_551_REQUESTED_ACTION_ABORTED_PAGE_TYPE_UNKNOWN,
+ "STOR",
+ ftpFile.getAbsolutePath(),
+ ftpFile);
+ }
+
+ try {
+ // notify the statistics component
+ ServerFtpStatistics ftpStat = (ServerFtpStatistics) context.getFtpStatistics();
+ ftpStat.setUpload(ftpSession, ftpFile, transferredBytes);
+
+ processSession.putAttribute(flowFile, CoreAttributes.FILENAME.key(), ftpFile.getName());
+ processSession.putAttribute(flowFile, CoreAttributes.PATH.key(), getPath(ftpFile));
+
+ processSession.getProvenanceReporter().modifyContent(flowFile);
+
+ processSession.transfer(flowFile, relationshipSuccess);
+ processSession.commit();
+ } catch (Exception exception) {
+ processSession.rollback();
+ LOG.error("Process session error. ", exception);
+ }
+
+ // if data transfer ok - send transfer complete message
+ ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+ FtpReply.REPLY_226_CLOSING_DATA_CONNECTION, "STOR",
+ ftpFile.getAbsolutePath(), ftpFile, transferredBytes));
+ }
+
+ private String getPath(FtpFile ftpFile) {
+ String absolutePath = ftpFile.getAbsolutePath();
+ int endIndex = absolutePath.length() - ftpFile.getName().length();
+ return ftpFile.getAbsolutePath().substring(0, endIndex);
+ }
+
+ private ProcessSession createProcessSession() throws InterruptedException, TimeoutException {
+ ProcessSessionFactory processSessionFactory = getProcessSessionFactory();
+ return processSessionFactory.createSession();
+ }
+
+ private ProcessSessionFactory getProcessSessionFactory() throws InterruptedException, TimeoutException {
+ if (sessionFactorySetSignal.await(10000, TimeUnit.MILLISECONDS)) {
+ return sessionFactory.get();
+ } else {
+ throw new TimeoutException("Waiting period for sessionFactory is over.");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java
new file mode 100644
index 0000000000..e60b3635a1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+public class NotSupportedCommand extends AbstractCommand {
+
+ private String message;
+
+ public NotSupportedCommand(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public void execute(FtpIoSession session, FtpServerContext context, FtpRequest request) {
+ // reset state variables
+ session.resetState();
+
+ session.write(new DefaultFtpReply(FtpReply.REPLY_502_COMMAND_NOT_IMPLEMENTED, message));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
new file mode 100644
index 0000000000..e46f90192c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final List existingPaths = new ArrayList<>();
+
+ public DefaultVirtualFileSystem() {
+ existingPaths.add(ROOT);
+ }
+
+ @Override
+ public boolean mkdir(VirtualPath newFile) {
+ lock.writeLock().lock();
+ try {
+ if (existingPaths.contains(newFile)) {
+ return false;
+ } else {
+ if (existingPaths.contains(newFile.getParent())) {
+ existingPaths.add(newFile);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean exists(VirtualPath virtualFile) {
+ lock.readLock().lock();
+ try {
+ return existingPaths.contains(virtualFile);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean delete(VirtualPath virtualFile) {
+ if (virtualFile.equals(ROOT)) { // Root cannot be deleted
+ return false;
+ }
+
+ lock.writeLock().lock();
+ try {
+ if (existingPaths.contains(virtualFile)) {
+ if (!hasSubDirectories(virtualFile)) {
+ return existingPaths.remove(virtualFile);
+ }
+ }
+ return false;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private boolean hasSubDirectories(VirtualPath directory) {
+ return existingPaths.stream().anyMatch(e -> isChildOf(directory, e));
+ }
+
+ private boolean isChildOf(VirtualPath parent, VirtualPath childCandidate) {
+ if (childCandidate.equals(ROOT)) {
+ return false;
+ }
+ return parent.equals(childCandidate.getParent());
+ }
+
+ @Override
+ public List listChildren(VirtualPath parent) {
+ List children;
+
+ lock.readLock().lock();
+ try {
+ if (parent.equals(ROOT)) {
+ children = existingPaths.stream()
+ .filter(existingPath -> (!existingPath.equals(ROOT) && (existingPath.getNameCount() == 1)))
+ .collect(Collectors.toList());
+ } else {
+ int parentNameCount = parent.getNameCount();
+ children = existingPaths.stream()
+ .filter(existingPath -> (((existingPath.getParent() != null) && existingPath.getParent().equals(parent))
+ && (existingPath.getNameCount() == (parentNameCount + 1))))
+ .collect(Collectors.toList());
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ return children;
+ }
+
+ @Override
+ public int getTotalNumberOfFiles() {
+ lock.readLock().lock();
+ try {
+ return existingPaths.size();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystem.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystem.java
new file mode 100644
index 0000000000..edfdb794c1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystem.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.List;
+
+/**
+ * The interface for virtual file system implementations. Provides the methods for the basic file system functionality.
+ */
+public interface VirtualFileSystem {
+
+ /**
+ * The virtual root folder of the virtual file system regardless of the operating system and its native file system structure.
+ */
+ VirtualPath ROOT = new VirtualPath("/");
+
+ /**
+ * Makes a new directory specified by the path received as a parameter.
+ *
+ * @param newFile The path that points to the directory that should be created.
+ * @return true if the new directory got created;
+ * false otherwise.
+ */
+ boolean mkdir(VirtualPath newFile);
+
+ /**
+ * Checks if the path received as a parameter already exists in the file system.
+ *
+ * @param virtualFile The path that may or may not exist in the file system.
+ * @return true if the specified path already exists in the file system;
+ * false otherwise.
+ */
+ boolean exists(VirtualPath virtualFile);
+
+ /**
+ * Deletes the file or directory specified by the path received as a parameter.
+ *
+ * @param virtualFile The path pointing to the file or directory that should be deleted.
+ * @return true if the file or directory got deleted;
+ * false otherwise.
+ */
+ boolean delete(VirtualPath virtualFile);
+
+ /**
+ * Lists the files and directories that are directly under the directory specified by the path received as a parameter.
+ *
+ * @param parent The path specifying the directory the contents of which should be listed.
+ * @return The list of paths that point to the contents of the parent directory.
+ */
+ List listChildren(VirtualPath parent);
+
+ /**
+ * Returns the number of all the files and folders in the file system.
+ *
+ * @return The number of files and folders in the file system.
+ */
+ int getTotalNumberOfFiles();
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemFactory.java
new file mode 100644
index 0000000000..9083dbdaa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.User;
+
+public class VirtualFileSystemFactory implements FileSystemFactory {
+
+ private VirtualFileSystem fileSystem;
+
+ public VirtualFileSystemFactory() {
+ fileSystem = new DefaultVirtualFileSystem();
+ }
+
+ public VirtualFileSystemFactory(VirtualFileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public FileSystemView createFileSystemView(User user) {
+ return new VirtualFileSystemView(user, fileSystem);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java
new file mode 100644
index 0000000000..685e3ee534
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualFileSystemView implements FileSystemView {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualFileSystemView.class);
+ private final VirtualFileSystem fileSystem;
+ private VirtualPath currentDirectory = VirtualFileSystem.ROOT;
+
+ public VirtualFileSystemView(User user, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+ if (user == null || fileSystem == null) {
+ throw new IllegalArgumentException("User and filesystem cannot be null.");
+ } else {
+ LOG.info("Virtual filesystem view created for user \"{}\"", user.getName());
+ this.fileSystem = fileSystem;
+ }
+ }
+
+ @Override
+ public FtpFile getHomeDirectory() {
+ return new VirtualFtpFile(VirtualFileSystem.ROOT, fileSystem);
+ }
+
+ @Override
+ public FtpFile getWorkingDirectory() {
+ return new VirtualFtpFile(currentDirectory, fileSystem);
+ }
+
+ @Override
+ public boolean changeWorkingDirectory(String targetPath) {
+ VirtualPath targetDirectory = currentDirectory.resolve(targetPath);
+ if (fileSystem.exists(targetDirectory)) {
+ currentDirectory = targetDirectory;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public FtpFile getFile(String fileName) throws FtpException {
+ VirtualPath filePath = currentDirectory.resolve(fileName);
+ VirtualPath parent = filePath.getParent();
+ if ((parent != null) && !fileSystem.exists(filePath.getParent())) {
+ throw new FtpException(String.format("Parent directory does not exist for %s", filePath.toString()));
+ }
+ return new VirtualFtpFile(filePath, fileSystem);
+ }
+
+ @Override
+ public boolean isRandomAccessible() {
+ return false;
+ }
+
+ @Override
+ public void dispose() { }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
new file mode 100644
index 0000000000..09c905398b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+ private final VirtualPath path;
+ private final VirtualFileSystem fileSystem;
+ private long lastModified;
+
+ public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+ if (path == null || fileSystem == null) {
+ throw new IllegalArgumentException("File path and fileSystem cannot be null");
+ }
+ this.path = path;
+ this.fileSystem = fileSystem;
+ this.lastModified = Calendar.getInstance().getTimeInMillis();
+ }
+
+ @Override
+ public String getAbsolutePath() {
+ return path.toString();
+ }
+
+ @Override
+ public String getName() {
+ return path.getFileName();
+ }
+
+ @Override
+ public boolean isHidden() {
+ return false;
+ }
+
+ @Override
+ public boolean isDirectory() {
+ return true; // Only directories are handled since files are converted into flowfiles immediately.
+ }
+
+ @Override
+ public boolean isFile() {
+ return false; // Only directories are handled since files are converted into flowfiles immediately.
+ }
+
+ @Override
+ public boolean doesExist() {
+ return fileSystem.exists(path);
+ }
+
+ @Override
+ public boolean isReadable() {
+ return true;
+ }
+
+ @Override
+ public boolean isWritable() {
+ return true;
+ }
+
+ @Override
+ public boolean isRemovable() {
+ return true; //Every virtual directory can be deleted
+ }
+
+ @Override
+ public String getOwnerName() {
+ return "user";
+ }
+
+ @Override
+ public String getGroupName() {
+ return "group";
+ }
+
+ @Override
+ public int getLinkCount() {
+ return 1;
+ }
+
+ @Override
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ @Override
+ public boolean setLastModified(long l) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("VirtualFtpFile.setLastModified()");
+ }
+
+ @Override
+ public long getSize() {
+ return 0;
+ }
+
+ @Override
+ public Object getPhysicalFile() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("VirtualFtpFile.getPhysicalFile()");
+ }
+
+ @Override
+ public boolean mkdir() {
+ return fileSystem.mkdir(path);
+ }
+
+ @Override
+ public boolean delete() {
+ return fileSystem.delete(path);
+ }
+
+ @Override
+ public boolean move(FtpFile ftpFile) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("VirtualFtpFile.move()");
+ }
+
+ @Override
+ public List extends FtpFile> listFiles() {
+ List paths = fileSystem.listChildren(path);
+ List files = new ArrayList<>();
+ for (VirtualPath path : paths) {
+ files.add(new VirtualFtpFile(path, fileSystem));
+ }
+ return files;
+ }
+
+ @Override
+ public OutputStream createOutputStream(long l) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("VirtualFtpFile.createOutputStream()");
+ }
+
+ @Override
+ public InputStream createInputStream(long l) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("VirtualFtpFile.createInputStream()");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof VirtualFtpFile)) {
+ return false;
+ }
+ VirtualFtpFile other = (VirtualFtpFile) o;
+ return fileSystem.equals(other.fileSystem) && path.equals(other.path);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
new file mode 100644
index 0000000000..b7247167be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class VirtualPath {
+
+ private final Path path; // always normalized
+
+ public VirtualPath(String path) {
+ String absolutePath = File.separator + normalizeSeparator(path);
+ this.path = Paths.get(absolutePath).normalize();
+ }
+
+ private String normalizeSeparator(String path) {
+ String pathWithoutStartingSeparator = removeStartingSeparator(path);
+ String normalizedPath = pathWithoutStartingSeparator.replace(File.separatorChar, '/');
+ normalizedPath = normalizedPath.replace('\\', '/');
+ return normalizedPath;
+ }
+
+ private String removeStartingSeparator(String path) {
+ int indexOfFirstNonSeparator;
+ for (indexOfFirstNonSeparator = 0; indexOfFirstNonSeparator < path.length(); ++indexOfFirstNonSeparator) {
+ if (!(path.charAt(indexOfFirstNonSeparator) == File.separatorChar) && !(path.charAt(indexOfFirstNonSeparator) == '/')) {
+ break;
+ }
+ }
+ return path.substring(indexOfFirstNonSeparator);
+ }
+
+ public String getFileName() {
+ if (path.getFileName() == null) {
+ return File.separator;
+ } else {
+ return path.getFileName().toString();
+ }
+ }
+
+ public VirtualPath getParent() {
+ if (path.getParent() == null) {
+ return null;
+ } else {
+ return new VirtualPath(path.getParent().toString());
+ }
+ }
+
+ public boolean isAbsolute() {
+ return path.isAbsolute();
+ }
+
+ public VirtualPath resolve(String otherPath) {
+ return new VirtualPath(path.resolve(otherPath).normalize().toString());
+ }
+
+ public String toString() {
+ return path.toString();
+ }
+
+ public int getNameCount() {
+ return path.getNameCount();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof VirtualPath)) {
+ return false;
+ }
+ VirtualPath other = (VirtualPath) o;
+ return path.equals(other.path);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 5b6f40d6ec..6a767c15ff 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -61,6 +61,7 @@ org.apache.nifi.processors.standard.IdentifyMimeType
org.apache.nifi.processors.standard.InvokeHTTP
org.apache.nifi.processors.standard.JoltTransformJSON
org.apache.nifi.processors.standard.ListDatabaseTables
+org.apache.nifi.processors.standard.ListenFTP
org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenRELP
org.apache.nifi.processors.standard.ListenSyslog
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListenFTP/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListenFTP/additionalDetails.html
new file mode 100644
index 0000000000..0899e6c256
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListenFTP/additionalDetails.html
@@ -0,0 +1,82 @@
+
+
+
+
+
+ ListenFTP
+
+
+
+
+
Usage Description
+
+ By starting the processor, an FTP server is started that listens for incoming connections on the specified port.
+ Each file copied to this FTP server gets converted into a FlowFile and transferred to the next processor via
+ the ListenFTP processor's 'success' relationship.
+
+
+
+ Before starting the processor, the following properties can be set:
+
+
+
+
Bind Address: if not set, the FTP server binds to all network interfaces of the host machine
+ (this is the default). If set to a valid address, the server is only available on that specific address.
+
Listening Port: the port on which the server listens for incoming connections.
+ Root privileges are required on Linux to be able to use port numbers below 1024.
+
Username and Password: Either both of them need to be set, or none of them. If set, the FTP server
+ only allows users to log in with the username-password pair specified in these properties.
+ If the Username and Password properties are left blank, the FTP server allows anonymous connections,
+ meaning that the client can connect to the FTP server by providing 'anonymous' as username, and leaving
+ the password field blank. Setting empty string as the value of these properties is not permitted, and doing so
+ results in the processor becoming invalid.
+
SSL Context Service: a Controller Service can optionally be specified that provides the ability to
+ configure keystore and/or truststore properties. When not specified, the FTP server does not use encryption.
+ By specifying an SSL Context Service, the FTP server started by this processor is set to use
+ Transport Layer Security (TLS) over FTP (FTPS).
+ If an SSL Context Service is selected, then a keystore file must also be specified in the SSL Context Service.
+ Without a keystore file, the processor cannot be started successfully.
+ Specifying a truststore file is optional. If a truststore file is specified, client authentication is required
+ (the client needs to send a certificate to the server).
+ Regardless of the selected TLS protocol, the highest available protocol is used for the connection.
+ For example if NiFi is running on Java 11 and TLSv1.2 is selected in the controller service as the preferred TLS protocol,
+ TLSv1.3 will be used (regardless of TLSv1.2 being selected) because Java 11 supports TLSv1.3.
+
+
+
+
+
+ After starting the processor and connecting to the FTP server, an empty root directory is visible in the client application.
+ Folders can be created in and deleted from the root directory and any of its subdirectories.
+ Files can be uploaded to any directory. Uploaded files do not show in the content list of directories, since
+ files are not actually stored on this FTP server, but converted into FlowFiles and transferred to the next processor via the
+ 'success' relationship. It is not possible to download or delete files like on a regular FTP server.
+ All the folders (including the root directory) are virtual directories, meaning that they only exist in memory and do not get
+ created in the file system of the host machine. Also, these directories are not persisted: by restarting the processor all the
+ directories (except for the root directory) get removed. Uploaded files do not get removed by restarting the processor, since
+ they are not stored on the FTP server, but transferred to the next processor as FlowFiles.
+ When a file named for example text01.txt is uploaded to the target folder /MyDirectory/MySubdirectory, a FlowFile gets
+ created. The content of the FlowFile is the same as the content of text01.txt, the 'filename' attribute of the FlowFile
+ contains the name of the original file (text01.txt) and the 'path' attribute of the flowfile contains the path where the file
+ was uploaded (/MyDirectory/MySubdirectory/).
+
+
+
+ The list of the FTP commands that are supported by the FTP server is available by starting the processor and
+ issuing the 'HELP' command to the server from an FTP client application.
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystem.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystem.java
new file mode 100644
index 0000000000..538106e9ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystem.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestVirtualFileSystem {
+
+ private VirtualFileSystem fileSystem;
+ private static final List ORIGINAL_DIRECTORY_LIST = Arrays.asList(
+ new VirtualPath("/"),
+ new VirtualPath("/Directory1"),
+ new VirtualPath("/Directory1/SubDirectory1"),
+ new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"),
+ new VirtualPath("/Directory1/SubDirectory2"),
+ new VirtualPath("/Directory2"),
+ new VirtualPath("/Directory2/SubDirectory3"),
+ new VirtualPath("/Directory2/SubDirectory4")
+ );
+
+ @Before
+ public void setup() {
+ setupVirtualDirectoryStructure();
+ }
+
+ private void setupVirtualDirectoryStructure() {
+ fileSystem = new DefaultVirtualFileSystem();
+ for (VirtualPath directory : ORIGINAL_DIRECTORY_LIST) {
+ if (!directory.equals(VirtualFileSystem.ROOT)) {
+ fileSystem.mkdir(directory);
+ }
+ }
+ }
+
+ @Test
+ public void testTryToCreateDirectoryWithNonExistentParents() {
+ // GIVEN
+ VirtualPath newDirectory = new VirtualPath("/Directory3/SubDirectory5/SubSubDirectory");
+
+ // WHEN
+ boolean directoryCreated = fileSystem.mkdir(newDirectory);
+
+ // THEN
+ assertFalse(directoryCreated);
+ assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
+ }
+
+ @Test
+ public void testListContentsOfDirectory() {
+ // GIVEN
+ VirtualPath parent = new VirtualPath("/Directory1");
+ VirtualPath[] expectedSubDirectories = {
+ new VirtualPath("/Directory1/SubDirectory1"),
+ new VirtualPath("/Directory1/SubDirectory2")
+ };
+
+ // WHEN
+ List subDirectories = fileSystem.listChildren(parent);
+
+ // THEN
+ assertThat(subDirectories, containsInAnyOrder(expectedSubDirectories));
+ }
+
+ @Test
+ public void testListContentsOfRoot() {
+ // GIVEN
+ VirtualPath parent = new VirtualPath("/");
+ VirtualPath[] expectedSubDirectories = {
+ new VirtualPath("/Directory1"),
+ new VirtualPath("/Directory2")
+ };
+
+ // WHEN
+ List subDirectories = fileSystem.listChildren(parent);
+
+ // THEN
+ assertThat(subDirectories, containsInAnyOrder(expectedSubDirectories));
+ }
+
+ @Test
+ public void testListContentsOfEmptyDirectory() {
+ // GIVEN
+ VirtualPath parent = new VirtualPath("/Directory2/SubDirectory3");
+
+ // WHEN
+ List subDirectories = fileSystem.listChildren(parent);
+
+ // THEN
+ assertEquals(0, subDirectories.size());
+ }
+
+ @Test
+ public void testTryToDeleteNonEmptyDirectory() {
+
+ // WHEN
+ boolean success = fileSystem.delete(new VirtualPath("/Directory1"));
+
+ // THEN
+ assertFalse(success);
+ assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
+ }
+
+ @Test
+ public void testDeleteEmptyDirectory() {
+ // GIVEN
+ List expectedRemainingDirectories = Arrays.asList(
+ new VirtualPath("/"),
+ new VirtualPath("/Directory1"),
+ new VirtualPath("/Directory1/SubDirectory1"),
+ new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"),
+ new VirtualPath("/Directory1/SubDirectory2"),
+ new VirtualPath("/Directory2"),
+ new VirtualPath("/Directory2/SubDirectory4")
+ );
+
+ // WHEN
+ boolean success = fileSystem.delete(new VirtualPath("/Directory2/SubDirectory3"));
+
+ // THEN
+ assertTrue(success);
+ assertAllDirectoriesAre(expectedRemainingDirectories);
+ }
+
+ @Test
+ public void testDeleteRoot() {
+
+ // WHEN
+ boolean success = fileSystem.delete(VirtualFileSystem.ROOT);
+
+ // THEN
+ assertFalse(success);
+ assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
+ }
+
+ @Test
+ public void testDeleteNonExistentDirectory() {
+
+ // WHEN
+ boolean success = fileSystem.delete(new VirtualPath("/Directory3"));
+
+ // THEN
+ assertFalse(success);
+ assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
+ }
+
+ private void assertAllDirectoriesAre(List expectedDirectories) {
+ if (expectedDirectories.size() != fileSystem.getTotalNumberOfFiles()) {
+ fail();
+ } else {
+ for (VirtualPath path : expectedDirectories) {
+ if (!fileSystem.exists(path)) {
+ fail();
+ }
+ }
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
new file mode 100644
index 0000000000..f4ebf22574
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestVirtualFileSystemView {
+
+ private FileSystemView fileSystemView;
+ private static VirtualFileSystem fileSystem;
+
+ @BeforeClass
+ public static void setupVirtualFileSystem() {
+ fileSystem = new DefaultVirtualFileSystem();
+ fileSystem.mkdir(new VirtualPath("/Directory1"));
+ fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1"));
+ fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"));
+ fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory2"));
+ fileSystem.mkdir(new VirtualPath("/Directory2"));
+ fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory3"));
+ fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory4"));
+ }
+
+ @Before
+ public void setup() throws FtpException {
+ User user = createUser();
+ FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory(fileSystem);
+ fileSystemView = fileSystemFactory.createFileSystemView(user);
+ }
+
+ @Test
+ public void testInRootDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+
+ // WHEN
+ // We do not change directories
+
+ // THEN
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testTryToMakeRootDirectory() {
+
+ // WHEN
+ boolean directoryCreated = fileSystem.mkdir(VirtualFileSystem.ROOT);
+
+ // THEN
+ assertFalse(directoryCreated);
+ }
+
+ @Test
+ public void testChangeToAnotherDirectory() throws FtpException {
+ // GIVEN
+ String expectedHomeDirectory = File.separator;
+ String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory("/Directory1");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedHomeDirectory);
+ assertCurrentDirectoryEquals(expectedCurrentDirectory);
+ }
+
+ @Test
+ public void testChangeToRootDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory("/");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testChangeToUnspecifiedDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory("");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testChangeToSameDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory(".");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testChangeToSameDirectoryNonRoot() throws FtpException {
+ // GIVEN
+ String expectedHomeDirectory = File.separator;
+ String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
+ fileSystemView.changeWorkingDirectory("/Directory1");
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory(".");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedHomeDirectory);
+ assertCurrentDirectoryEquals(expectedCurrentDirectory);
+ }
+
+ @Test
+ public void testChangeToParentDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+ fileSystemView.changeWorkingDirectory("/Directory1");
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory("..");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testChangeToParentDirectoryNonRoot() throws FtpException {
+ // GIVEN
+ String expectedHomeDirectory = File.separator;
+ String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
+ fileSystemView.changeWorkingDirectory("/Directory1");
+ fileSystemView.changeWorkingDirectory("SubDirectory1");
+
+ // WHEN
+ fileSystemView.changeWorkingDirectory("..");
+
+ // THEN
+ assertHomeDirectoryEquals(expectedHomeDirectory);
+ assertCurrentDirectoryEquals(expectedCurrentDirectory);
+ }
+
+ @Test
+ public void testChangeToNonExistentDirectory() throws FtpException {
+ // GIVEN
+ String expectedDirectory = File.separator;
+
+ // WHEN
+ boolean changeDirectoryResult = fileSystemView.changeWorkingDirectory("/Directory2/SubDirectory3/SubSubDirectory");
+
+ // THEN
+ assertFalse(changeDirectoryResult);
+ assertHomeDirectoryEquals(expectedDirectory);
+ assertCurrentDirectoryEquals(expectedDirectory);
+ }
+
+ @Test
+ public void testGetFileAbsolute() throws FtpException {
+ // GIVEN
+ String expectedDirectory = "/Directory2/SubDirectory3".replace('/', File.separatorChar);
+ fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+ // WHEN
+ FtpFile file = fileSystemView.getFile("/Directory2/SubDirectory3");
+
+ // THEN
+ assertEquals(expectedDirectory, file.getAbsolutePath());
+ }
+
+ @Test
+ public void testGetFileNonAbsolute() throws FtpException {
+ // GIVEN
+ String expectedDirectory = "/Directory1/SubDirectory1/SubSubDirectory".replace('/', File.separatorChar);
+ fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+ // WHEN
+ FtpFile file = fileSystemView.getFile("SubSubDirectory");
+
+ // THEN
+ assertEquals(expectedDirectory, file.getAbsolutePath());
+ }
+
+ private User createUser() {
+ BaseUser user = new BaseUser();
+ user.setName("Username");
+ user.setPassword("Password");
+ user.setHomeDirectory("/abc/def");
+ user.setAuthorities(Collections.singletonList(new WritePermission()));
+ return user;
+ }
+
+ private void assertHomeDirectoryEquals(String expectedHomeDirectory) throws FtpException {
+ FtpFile homeDirectory = fileSystemView.getHomeDirectory();
+ assertEquals(expectedHomeDirectory, homeDirectory.getAbsolutePath());
+ }
+
+ private void assertCurrentDirectoryEquals(String expectedCurrentDirectory) throws FtpException {
+ FtpFile currentDirectory = fileSystemView.getWorkingDirectory();
+ assertEquals(expectedCurrentDirectory, currentDirectory.getAbsolutePath());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualPath.java
new file mode 100644
index 0000000000..b655d951df
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualPath.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestVirtualPath {
+
+ @Test
+ public void testCreatePathStartingWithSlash() {
+ // GIVEN
+ String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathStartingWithDoubleSlash() {
+ // GIVEN
+ String expectedPath = "/Directory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("//Directory1");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathEndingWithSlash() {
+ // GIVEN
+ String expectedPath = "/Directory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathEndingWithDoubleSlash() {
+ // GIVEN
+ String expectedPath = "/Directory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1//");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathNotStartingWithSlash() {
+ // GIVEN
+ String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("Directory1/Directory2");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatPathToRoot() {
+ // GIVEN
+ String expectedPath = File.separator;
+ VirtualPath objectUnderTest = new VirtualPath("/");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathToRootWithDoubleSlash() {
+ // GIVEN
+ String expectedPath = File.separator;
+ VirtualPath objectUnderTest = new VirtualPath("//");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathThatNeedsToBeResolved() {
+ // GIVEN
+ String expectedPath = "/Directory1/SubDirectory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("//Directory1/SubDirectory1/../SubDirectory1");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathWithWhitespace() {
+ // GIVEN
+ String expectedPath = "/Directory 1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory 1");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathWithBackslashes() {
+ // GIVEN
+ String expectedPath = "/Directory1/SubDirectory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("\\Directory1\\SubDirectory1");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testCreatePathWithSpecialCharacters() {
+ // GIVEN
+ String expectedPath = "/űáú▣☃/SubDirectory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/űáú▣☃/SubDirectory1");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testEmptyPathPointsToRoot() {
+ // GIVEN
+ String expectedPath = File.separator;
+ VirtualPath objectUnderTest = new VirtualPath("");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testPathIsNormalized() {
+ // GIVEN
+ String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1///Directory2\\\\Directory3/Directory4/../..");
+
+ // WHEN
+ String result = objectUnderTest.toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testGetFileNameForRoot() {
+ // GIVEN
+ String expectedPath = File.separator;
+ VirtualPath objectUnderTest = new VirtualPath("/");
+
+ // WHEN, THEN
+ assertEquals(expectedPath, objectUnderTest.getFileName());
+ }
+
+ @Test
+ public void testGetFileNameForNonRoot() {
+ // GIVEN
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2/file.txt");
+
+ // WHEN
+ String result = objectUnderTest.getFileName();
+
+ // THEN
+ assertEquals("file.txt", result);
+ }
+
+ @Test
+ public void testGetParentForRoot() {
+ // GIVEN
+ VirtualPath objectUnderTest = new VirtualPath("/");
+
+ // WHEN, THEN
+ assertNull(objectUnderTest.getParent());
+ }
+
+ @Test
+ public void testGetParentForNonRoot() {
+ // GIVEN
+ String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2/file.txt");
+
+ // WHEN
+ VirtualPath parent = objectUnderTest.getParent();
+
+ // THEN
+ assertEquals(expectedPath, parent.toString());
+ }
+
+ @Test
+ public void testResolveToARelativePath() {
+ // GIVEN
+ String expectedPath = "/Directory1/Directory2/Directory3/Directory4".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
+
+ // WHEN
+ String result = objectUnderTest.resolve("Directory3/Directory4").toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testResolveToParent() {
+ // GIVEN
+ String expectedPath = "/Directory1".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
+
+ // WHEN
+ String result = objectUnderTest.resolve("..").toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testResolveToAnAbsolutePath() {
+ // GIVEN
+ String expectedPath = "/Directory3/Directory4".replace('/', File.separatorChar);
+ VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
+
+ // WHEN
+ String result = objectUnderTest.resolve("/Directory3/Directory4").toString();
+
+ // THEN
+ assertEquals(expectedPath, result);
+ }
+
+ @Test
+ public void testEquals() {
+ // GIVEN
+ VirtualPath path1 = new VirtualPath("/Directory1/Directory2");
+ VirtualPath path2 = new VirtualPath("/Directory1/Directory2");
+
+ // WHEN, THEN
+ assertEquals(path1, path2);
+ }
+
+ @Test
+ public void testDoesNotEqual() {
+ // GIVEN
+ VirtualPath path1 = new VirtualPath("/Directory1/Directory2");
+ VirtualPath path2 = new VirtualPath("/Directory1/Directory3");
+
+ // WHEN, THEN
+ assertNotEquals(path1, path2);
+ }
+
+}