NIFI-7624: ListenFTP processor

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4481.
This commit is contained in:
Peter Gyori 2020-08-14 21:42:02 +02:00 committed by Pierre Villard
parent fc610eaeda
commit 266433e13d
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
24 changed files with 2615 additions and 6 deletions

View File

@ -427,6 +427,9 @@ The following binary components are provided under the Apache Software License v
Apache MINA Core Apache MINA Core
Copyright 2004-2011 Apache MINA Project Copyright 2004-2011 Apache MINA Project
Apache MINA Core 2.0.16
Copyright 2004-2016 Apache MINA Project
(ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
(ASLv2) Apache Velocity (ASLv2) Apache Velocity
@ -1933,6 +1936,16 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies: The following NOTICE information applies:
Copyright 2016 Jeroen van Erp <jeroen@hierynomus.com> Copyright 2016 Jeroen van Erp <jeroen@hierynomus.com>
(ASLv2) Apache Ftplet API
The following NOTICE information applies:
Apache Ftplet API 1.1.1
Copyright 2003-2017 The Apache Software Foundation
(ASLv2) Apache FtpServer Core
The following NOTICE information applies:
Apache FtpServer Core 1.1.1
Copyright 2003-2017 The Apache Software Foundation
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
************************ ************************

View File

@ -354,4 +354,27 @@ This product bundles 'js-beautify' which is available under an MIT license.
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. THE SOFTWARE.
This product bundles 'SLF4J-API 1.7.21' which is available under an MIT license.
Copyright (c) 2004-2007 QOS.ch
All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -238,6 +238,21 @@ The following binary components are provided under the Apache Software License v
- Apache MINA SSHD - Apache MINA SSHD
- Apache Commons-Net - Apache Commons-Net
(ASLv2) Apache Ftplet API
The following NOTICE information applies:
Apache Ftplet API 1.1.1
Copyright 2003-2017 The Apache Software Foundation
(ASLv2) Apache FtpServer Core
The following NOTICE information applies:
Apache FtpServer Core 1.1.1
Copyright 2003-2017 The Apache Software Foundation
(ASLv2) Apache MINA Core
The following NOTICE information applies:
Apache MINA Core 2.0.16
Copyright 2004-2016 Apache MINA Project
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
************************ ************************

View File

@ -19,6 +19,12 @@
<artifactId>nifi-standard-processors</artifactId> <artifactId>nifi-standard-processors</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.ftpserver</groupId>
<artifactId>ftpserver-core</artifactId>
<version>1.1.1</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
@ -296,12 +302,6 @@
<artifactId>MockFtpServer</artifactId> <artifactId>MockFtpServer</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.19</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.sshd</groupId> <groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId> <artifactId>sshd-core</artifactId>
@ -390,6 +390,12 @@
<version>1.13.0-SNAPSHOT</version> <version>1.13.0-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.ftp.FtpServer;
import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
import org.apache.nifi.ssl.SSLContextService;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"ingest", "FTP", "FTPS", "listen"})
@CapabilityDescription("Starts an FTP server that listens on the specified port and transforms incoming files into FlowFiles. "
+ "The URI of the service will be ftp://{hostname}:{port}. The default port is 2221.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file received via the FTP/FTPS connection."),
@WritesAttribute(attribute = "path", description = "The path pointing to the file's target directory. "
+ "E.g.: file.txt is uploaded to /Folder1/SubFolder, then the value of the path attribute will be \"/Folder1/SubFolder/\" "
+ "(note that it ends with a separator character).")
})
@SeeAlso(classNames = {"org.apache.nifi.ssl.StandardRestrictedSSLContextService","org.apache.nifi.ssl.StandardSSLContextService"})
public class ListenFTP extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service that can be used to create secure connections. "
+ "If an SSL Context Service is selected, then a keystore file must also be specified in the SSL Context Service. "
+ "Without a keystore file, the processor cannot be started successfully."
+ "Specifying a truststore file is optional. If a truststore file is specified, client authentication is required "
+ "(the client needs to send a certificate to the server)."
+ "Regardless of the selected TLS protocol, the highest available protocol is used for the connection. "
+ "For example if NiFi is running on Java 11 and TLSv1.2 is selected in the controller service as the "
+ "preferred TLS Protocol, TLSv1.3 will be used (regardless of TLSv1.2 being selected) because Java 11 "
+ "supports TLSv1.3.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description("Relationship for successfully received files.")
.build();
public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
.name("bind-address")
.displayName("Bind Address")
.description("The address the FTP server should be bound to. If not set (or set to 0.0.0.0), "
+ "the server binds to all available addresses (i.e. all network interfaces of the host machine).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("listening-port")
.displayName("Listening Port")
.description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
.required(true)
.defaultValue("2221")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("username")
.displayName("Username")
.description("The name of the user that is allowed to log in to the FTP server. "
+ "If a username is provided, a password must also be provided. "
+ "If no username is specified, anonymous connections will be permitted.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("password")
.displayName("Password")
.description("If the Username is set, then a password must also be specified. "
+ "The password provided by the client trying to log in to the FTP server will be checked against this password.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
BIND_ADDRESS,
PORT,
USERNAME,
PASSWORD,
SSL_CONTEXT_SERVICE
));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
RELATIONSHIP_SUCCESS
)));
private volatile FtpServer ftpServer;
private volatile CountDownLatch sessionFactorySetSignal;
private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void startFtpServer(ProcessContext context) {
if (ftpServer == null) {
sessionFactory.set(null);
String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
try {
sessionFactorySetSignal = new CountDownLatch(1);
ftpServer = new NifiFtpServer.Builder()
.sessionFactory(sessionFactory)
.sessionFactorySetSignal(sessionFactorySetSignal)
.relationshipSuccess(RELATIONSHIP_SUCCESS)
.bindAddress(bindAddress)
.port(port)
.username(username)
.password(password)
.sslContextService(sslContextService)
.build();
ftpServer.start();
} catch (ProcessException processException) {
getLogger().error(processException.getMessage(), processException);
stopFtpServer();
throw processException;
}
} else {
getLogger().warn("Ftp server already started.");
}
}
@OnStopped
public void stopFtpServer() {
if (ftpServer != null && !ftpServer.isStopped()) {
ftpServer.stop();
}
ftpServer = null;
sessionFactory.set(null);
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
sessionFactorySetSignal.countDown();
}
context.yield();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> results = new ArrayList<>(3);
validateUsernameAndPassword(context, results);
validateBindAddress(context, results);
return results;
}
private void validateUsernameAndPassword(ValidationContext context, Collection<ValidationResult> validationResults) {
String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
if ((username == null) && (password != null)) {
validationResults.add(usernameOrPasswordIsNull(USERNAME));
} else if ((username != null) && (password == null)) {
validationResults.add(usernameOrPasswordIsNull(PASSWORD));
}
}
private void validateBindAddress(ValidationContext context, Collection<ValidationResult> validationResults) {
String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
try {
InetAddress.getByName(bindAddress);
} catch (UnknownHostException e) {
String explanation = String.format("'%s' is unknown", BIND_ADDRESS.getDisplayName());
validationResults.add(createValidationResult(BIND_ADDRESS.getDisplayName(), explanation));
}
}
private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty) {
String explanation = String.format("'%s' and '%s' should either both be provided or none of them", USERNAME.getDisplayName(), PASSWORD.getDisplayName());
return createValidationResult(nullProperty.getDisplayName(), explanation);
}
private ValidationResult createValidationResult(String subject, String explanation) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation(explanation).build();
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp;
import org.apache.nifi.processor.exception.ProcessException;
/**
* The standard interface for FTP server implementations.
*/
public interface FtpServer {
/**
* Starts the FTP server.
* @throws ProcessException if the server could not be started.
*/
void start() throws ProcessException;
/**
* Stops the FTP server.
*/
void stop();
/**
* Returns the status of the FTP server.
* @return true if the server is stopped, false otherwise.
*/
boolean isStopped();
}

View File

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp;
import org.apache.ftpserver.ConnectionConfig;
import org.apache.ftpserver.ConnectionConfigFactory;
import org.apache.ftpserver.DataConnectionConfiguration;
import org.apache.ftpserver.DataConnectionConfigurationFactory;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerConfigurationException;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.command.Command;
import org.apache.ftpserver.command.CommandFactory;
import org.apache.ftpserver.command.CommandFactoryFactory;
import org.apache.ftpserver.ftplet.Authority;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.User;
import org.apache.ftpserver.listener.Listener;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.ftpserver.ssl.SslConfiguration;
import org.apache.ftpserver.ssl.SslConfigurationFactory;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.ftp.commands.CommandMapFactory;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
import org.apache.nifi.ssl.SSLContextService;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class NifiFtpServer implements org.apache.nifi.processors.standard.ftp.FtpServer {
private final FtpServer server;
private NifiFtpServer(Map<String, Command> commandMap, FileSystemFactory fileSystemFactory, ConnectionConfig connectionConfig, Listener listener, User user) throws ProcessException {
try {
FtpServerFactory serverFactory = new FtpServerFactory();
serverFactory.setFileSystem(fileSystemFactory);
serverFactory.setCommandFactory(createCommandFactory(commandMap));
serverFactory.setConnectionConfig(connectionConfig);
serverFactory.addListener("default", listener);
serverFactory.getUserManager().save(user);
server = serverFactory.createServer();
} catch (Exception exception) {
throw new ProcessException("FTP server could not be started.", exception);
}
}
private CommandFactory createCommandFactory(Map<String, Command> commandMap) {
CommandFactoryFactory commandFactoryFactory = new CommandFactoryFactory();
commandFactoryFactory.setUseDefaultCommands(false);
commandFactoryFactory.setCommandMap(commandMap);
return commandFactoryFactory.createCommandFactory();
}
public void start() throws ProcessException {
try {
server.start();
} catch (Exception exception) {
throw new ProcessException("FTP server could not be started.", exception);
}
}
public void stop() {
server.stop();
}
public boolean isStopped() {
return server.isStopped();
}
public static class Builder {
private static final String HOME_DIRECTORY = "/virtual/ftproot";
private AtomicReference<ProcessSessionFactory> sessionFactory;
private CountDownLatch sessionFactorySetSignal;
private Relationship relationshipSuccess;
private String bindAddress;
private int port;
private String username;
private String password;
private SSLContextService sslContextService;
public Builder sessionFactory(AtomicReference<ProcessSessionFactory> sessionFactory) {
this.sessionFactory = sessionFactory;
return this;
}
public Builder sessionFactorySetSignal(CountDownLatch sessionFactorySetSignal) {
Objects.requireNonNull(sessionFactorySetSignal);
this.sessionFactorySetSignal = sessionFactorySetSignal;
return this;
}
public Builder relationshipSuccess(Relationship relationship) {
Objects.requireNonNull(relationship);
this.relationshipSuccess = relationship;
return this;
}
public Builder bindAddress(String bindAddress) {
this.bindAddress = bindAddress;
return this;
}
public Builder port(int port) {
this.port = port;
return this;
}
public Builder username(String username) {
this.username = username;
return this;
}
public Builder password(String password) {
this.password = password;
return this;
}
public Builder sslContextService(SSLContextService sslContextService) {
this.sslContextService = sslContextService;
return this;
}
public NifiFtpServer build() throws ProcessException {
try {
boolean anonymousLoginEnabled = (username == null);
FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory();
CommandMapFactory commandMapFactory = new CommandMapFactory(sessionFactory, sessionFactorySetSignal, relationshipSuccess);
Map<String, Command> commandMap = commandMapFactory.createCommandMap();
ConnectionConfig connectionConfig = createConnectionConfig(anonymousLoginEnabled);
Listener listener = createListener(bindAddress, port, sslContextService);
User user = createUser(username, password, HOME_DIRECTORY);
return new NifiFtpServer(commandMap, fileSystemFactory, connectionConfig, listener, user);
} catch (Exception exception) {
throw new ProcessException("FTP server could not be started.", exception);
}
}
private ConnectionConfig createConnectionConfig(boolean anonymousLoginEnabled) {
ConnectionConfigFactory connectionConfigFactory = new ConnectionConfigFactory();
connectionConfigFactory.setAnonymousLoginEnabled(anonymousLoginEnabled);
return connectionConfigFactory.createConnectionConfig();
}
private Listener createListener(String bindAddress, int port, SSLContextService sslContextService) throws FtpServerConfigurationException {
ListenerFactory listenerFactory = new ListenerFactory();
listenerFactory.setServerAddress(bindAddress);
listenerFactory.setPort(port);
if (sslContextService != null) {
SslConfigurationFactory ssl = new SslConfigurationFactory();
ssl.setKeystoreFile(new File(sslContextService.getKeyStoreFile()));
ssl.setKeystorePassword(sslContextService.getKeyStorePassword());
ssl.setKeyPassword(sslContextService.getKeyPassword());
ssl.setKeystoreType(sslContextService.getKeyStoreType());
ssl.setSslProtocol(sslContextService.getSslAlgorithm());
if (sslContextService.getTrustStoreFile() != null){
ssl.setClientAuthentication("NEED");
ssl.setTruststoreFile(new File(sslContextService.getTrustStoreFile()));
ssl.setTruststorePassword(sslContextService.getTrustStorePassword());
ssl.setTruststoreType(sslContextService.getTrustStoreType());
}
SslConfiguration sslConfiguration = ssl.createSslConfiguration();
// Set implicit security for the control socket
listenerFactory.setSslConfiguration(sslConfiguration);
listenerFactory.setImplicitSsl(true);
// Set implicit security for the data connection
DataConnectionConfigurationFactory dataConnectionConfigurationFactory = new DataConnectionConfigurationFactory();
dataConnectionConfigurationFactory.setImplicitSsl(true);
dataConnectionConfigurationFactory.setSslConfiguration(sslConfiguration);
DataConnectionConfiguration dataConnectionConfiguration = dataConnectionConfigurationFactory.createDataConnectionConfiguration();
listenerFactory.setDataConnectionConfiguration(dataConnectionConfiguration);
}
return listenerFactory.createListener();
}
private User createUser(String username, String password, String homeDirectory) {
boolean anonymousLoginEnabled = (username == null);
if (anonymousLoginEnabled) {
return createAnonymousUser(homeDirectory, Collections.singletonList(new WritePermission()));
} else {
return createNamedUser(username, password, homeDirectory, Collections.singletonList(new WritePermission()));
}
}
private User createAnonymousUser(String homeDirectory, List<Authority> authorities) {
BaseUser user = new BaseUser();
user.setName("anonymous");
user.setHomeDirectory(homeDirectory);
user.setAuthorities(authorities);
return user;
}
private User createNamedUser(String username, String password, String homeDirectory, List<Authority> authorities) {
BaseUser user = new BaseUser();
user.setName(username);
user.setPassword(password);
user.setHomeDirectory(homeDirectory);
user.setAuthorities(authorities);
return user;
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
import org.apache.ftpserver.command.Command;
import org.apache.ftpserver.command.impl.ABOR;
import org.apache.ftpserver.command.impl.AUTH;
import org.apache.ftpserver.command.impl.CDUP;
import org.apache.ftpserver.command.impl.CWD;
import org.apache.ftpserver.command.impl.EPRT;
import org.apache.ftpserver.command.impl.EPSV;
import org.apache.ftpserver.command.impl.FEAT;
import org.apache.ftpserver.command.impl.LIST;
import org.apache.ftpserver.command.impl.MDTM;
import org.apache.ftpserver.command.impl.MKD;
import org.apache.ftpserver.command.impl.MLSD;
import org.apache.ftpserver.command.impl.MLST;
import org.apache.ftpserver.command.impl.MODE;
import org.apache.ftpserver.command.impl.NLST;
import org.apache.ftpserver.command.impl.NOOP;
import org.apache.ftpserver.command.impl.OPTS;
import org.apache.ftpserver.command.impl.PASS;
import org.apache.ftpserver.command.impl.PASV;
import org.apache.ftpserver.command.impl.PBSZ;
import org.apache.ftpserver.command.impl.PORT;
import org.apache.ftpserver.command.impl.PROT;
import org.apache.ftpserver.command.impl.PWD;
import org.apache.ftpserver.command.impl.QUIT;
import org.apache.ftpserver.command.impl.REIN;
import org.apache.ftpserver.command.impl.RMD;
import org.apache.ftpserver.command.impl.SITE;
import org.apache.ftpserver.command.impl.SITE_DESCUSER;
import org.apache.ftpserver.command.impl.SITE_HELP;
import org.apache.ftpserver.command.impl.SITE_STAT;
import org.apache.ftpserver.command.impl.SITE_WHO;
import org.apache.ftpserver.command.impl.SITE_ZONE;
import org.apache.ftpserver.command.impl.SIZE;
import org.apache.ftpserver.command.impl.STAT;
import org.apache.ftpserver.command.impl.STRU;
import org.apache.ftpserver.command.impl.SYST;
import org.apache.ftpserver.command.impl.TYPE;
import org.apache.ftpserver.command.impl.USER;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class CommandMapFactory {
private final Map<String, Command> commandMap = new HashMap<>();
private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
private final AtomicReference<ProcessSessionFactory> sessionFactory;
private final CountDownLatch sessionFactorySetSignal;
private final Relationship relationshipSuccess;
public CommandMapFactory(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal, Relationship relationshipSuccess) {
this.sessionFactory = sessionFactory;
this.sessionFactorySetSignal = sessionFactorySetSignal;
this.relationshipSuccess = relationshipSuccess;
}
public Map<String, Command> createCommandMap() {
addToCommandMap("ABOR", new ABOR());
addToCommandMap("ACCT", new NotSupportedCommand("Operation (ACCT) not supported."));
addToCommandMap("APPE", new NotSupportedCommand("Operation (APPE) not supported."));
addToCommandMap("AUTH", new AUTH());
addToCommandMap("CDUP", new CDUP());
addToCommandMap("CWD", new CWD());
addToCommandMap("DELE", new NotSupportedCommand("Operation (DELE) not supported."));
addToCommandMap("EPRT", new EPRT());
addToCommandMap("EPSV", new EPSV());
addToCommandMap("FEAT", new FEAT());
addToCommandMap("HELP", customHelpCommand);
addToCommandMap("LIST", new LIST());
addToCommandMap("MFMT", new NotSupportedCommand("Operation (MFMT) not supported."));
addToCommandMap("MDTM", new MDTM());
addToCommandMap("MLST", new MLST());
addToCommandMap("MKD", new MKD());
addToCommandMap("MLSD", new MLSD());
addToCommandMap("MODE", new MODE());
addToCommandMap("NLST", new NLST());
addToCommandMap("NOOP", new NOOP());
addToCommandMap("OPTS", new OPTS());
addToCommandMap("PASS", new PASS());
addToCommandMap("PASV", new PASV());
addToCommandMap("PBSZ", new PBSZ());
addToCommandMap("PORT", new PORT());
addToCommandMap("PROT", new PROT());
addToCommandMap("PWD", new PWD());
addToCommandMap("QUIT", new QUIT());
addToCommandMap("REIN", new REIN());
addToCommandMap("REST", new NotSupportedCommand("Operation (REST) not supported."));
addToCommandMap("RETR", new NotSupportedCommand("Operation (RETR) not supported."));
addToCommandMap("RMD", new RMD());
addToCommandMap("RNFR", new NotSupportedCommand("Operation (RNFR) not supported."));
addToCommandMap("RNTO", new NotSupportedCommand("Operation (RNTO) not supported."));
addToCommandMap("SITE", new SITE());
addToCommandMap("SIZE", new SIZE());
addToCommandMap("SITE_DESCUSER", new SITE_DESCUSER());
addToCommandMap("SITE_HELP", new SITE_HELP());
addToCommandMap("SITE_STAT", new SITE_STAT());
addToCommandMap("SITE_WHO", new SITE_WHO());
addToCommandMap("SITE_ZONE", new SITE_ZONE());
addToCommandMap("STAT", new STAT());
addToCommandMap("STOR", new FtpCommandSTOR(sessionFactory, sessionFactorySetSignal, relationshipSuccess));
addToCommandMap("STOU", new NotSupportedCommand("Operation (STOU) not supported."));
addToCommandMap("STRU", new STRU());
addToCommandMap("SYST", new SYST());
addToCommandMap("TYPE", new TYPE());
addToCommandMap("USER", new USER());
return commandMap;
}
private void addToCommandMap(String command, Command instance) {
commandMap.put(command, instance);
if (!(instance instanceof NotSupportedCommand)) {
customHelpCommand.addCommand(command);
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
import org.apache.ftpserver.ftplet.FtpFile;
public class DetailedFtpCommandException extends FtpCommandException {
private final String subId;
private final FtpFile ftpFile;
public DetailedFtpCommandException(int ftpReturnCode, String subId, String basicMessage, FtpFile ftpFile) {
super(ftpReturnCode, basicMessage);
this.subId = subId;
this.ftpFile = ftpFile;
}
public String getSubId() {
return subId;
}
public FtpFile getFtpFile() {
return ftpFile;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
public class FtpCommandException extends Exception {
private final int ftpReturnCode;
public FtpCommandException(int ftpReturnCode, String basicMessage) {
super(basicMessage);
this.ftpReturnCode = ftpReturnCode;
}
public int getFtpReturnCode() {
return ftpReturnCode;
}
@Override
public Throwable fillInStackTrace() {
return this;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
import org.apache.ftpserver.command.AbstractCommand;
import org.apache.ftpserver.ftplet.DefaultFtpReply;
import org.apache.ftpserver.ftplet.FtpReply;
import org.apache.ftpserver.ftplet.FtpRequest;
import org.apache.ftpserver.impl.FtpIoSession;
import org.apache.ftpserver.impl.FtpServerContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class FtpCommandHELP extends AbstractCommand {
private static Map<String, String> COMMAND_SPECIFIC_HELP;
private static int MAX_NUMBER_OF_COMMANDS_IN_A_ROW = 5;
private Set<String> availableCommands = new TreeSet<>();
static {
Map<String, String> commands = new HashMap<>();
commands.put("ABOR", "Syntax: ABOR");
commands.put("APPE", "Syntax: APPE <sp> <pathname>");
commands.put("AUTH", "Syntax: AUTH <sp> <security_mechanism>");
commands.put("CDUP", "Syntax: CDUP");
commands.put("CWD", "Syntax: CWD <sp> <pathname>");
commands.put("DELE", "Syntax: DELE <sp> <pathname>");
commands.put("EPRT", "Syntax: EPRT<space><d><net-prt><d><net-addr><d><tcp-port><d>");
commands.put("EPSV", "Syntax: EPSV");
commands.put("FEAT", "Syntax: FEAT");
commands.put("HELP", "Syntax: HELP [<sp> <string>]");
commands.put("LIST", "Syntax: LIST [<sp> <pathname>]");
commands.put("MDTM", "Syntax: MDTM <sp> <pathname>");
commands.put("MKD", "Syntax: MKD <sp> <pathname>");
commands.put("MLSD", "Syntax: MLSD [<sp> <pathname>]");
commands.put("MLST", "Syntax: MLST [<sp> <pathname>]");
commands.put("MODE", "Syntax: MODE <sp> <mode-code>");
commands.put("NLST", "Syntax: NLST [<sp> <pathname>]");
commands.put("NOOP", "Syntax: NOOP");
commands.put("OPTS", "Syntax: OPTS <sp> <options>");
commands.put("PASS", "Syntax: PASS <sp> <password>");
commands.put("PASV", "Syntax: PASV");
commands.put("PBSZ", "Syntax: PBSZ <sp> <buffer_size>");
commands.put("PORT", "Syntax: PORT <sp> <host-port>");
commands.put("PROT", "Syntax: PROT <sp> <protection_level>");
commands.put("PWD", "Syntax: PWD");
commands.put("QUIT", "Syntax: QUIT");
commands.put("REIN", "Syntax: REIN");
commands.put("REST", "Syntax: REST <sp> <marker>");
commands.put("RETR", "Syntax: RETR <sp> <pathname>");
commands.put("RMD", "Syntax: RMD <sp> <pathname>");
commands.put("RNFR", "Syntax: RNFR <sp> <pathname>");
commands.put("RNTO", "Syntax: RNTO <sp> <pathname>");
commands.put("SITE", "Syntax: SITE <sp> <string>");
commands.put("SIZE", "Syntax: SIZE <sp> <pathname>");
commands.put("STAT", "Syntax: STAT [<sp> <pathname>]");
commands.put("STOR", "Syntax: STOR <sp> <pathname>");
commands.put("STOU", "Syntax: STOU");
commands.put("SYST", "Syntax: SYST");
commands.put("TYPE", "Syntax: TYPE <sp> <type-code>");
commands.put("USER", "Syntax: USER <sp> <username>");
COMMAND_SPECIFIC_HELP = Collections.unmodifiableMap(commands);
}
public void addCommand(String command) {
if (!command.startsWith("SITE_")) { // Parameterized commands of SITE will not appear in the general help.
availableCommands.add(command);
}
}
public void execute(final FtpIoSession session,
final FtpServerContext context, final FtpRequest request) {
// reset state variables
session.resetState();
if (!request.hasArgument()) {
sendDefaultHelpMessage(session);
} else {
handleRequestWithArgument(session, request);
}
}
private void sendDefaultHelpMessage(FtpIoSession session) {
sendCustomHelpMessage(session, getDefaultHelpMessage());
}
private String getDefaultHelpMessage() {
StringBuffer helpMessage = new StringBuffer("The following commands are supported.\n");
int currentNumberOfCommandsInARow = 0;
Iterator<String> iterator = availableCommands.iterator();
while (iterator.hasNext()) {
String command = iterator.next();
if (currentNumberOfCommandsInARow == MAX_NUMBER_OF_COMMANDS_IN_A_ROW) {
helpMessage.append("\n");
currentNumberOfCommandsInARow = 0;
}
if (iterator.hasNext()) {
helpMessage.append(command + ", ");
} else {
helpMessage.append(command);
}
++currentNumberOfCommandsInARow;
}
helpMessage.append("\nEnd of help.");
return helpMessage.toString();
}
private void sendCustomHelpMessage(FtpIoSession session, String message) {
session.write(new DefaultFtpReply(FtpReply.REPLY_214_HELP_MESSAGE, message));
}
private void handleRequestWithArgument(FtpIoSession session, FtpRequest request) {
// Send command-specific help if available
String ftpCommand = request.getArgument().toUpperCase();
String commandSpecificHelp = null;
if (availableCommands.contains(ftpCommand)) {
commandSpecificHelp = COMMAND_SPECIFIC_HELP.get(ftpCommand);
}
if (commandSpecificHelp == null) {
sendDefaultHelpMessage(session);
} else {
sendCustomHelpMessage(session, commandSpecificHelp);
}
}
}

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
import org.apache.ftpserver.command.AbstractCommand;
import org.apache.ftpserver.ftplet.DataConnection;
import org.apache.ftpserver.ftplet.DataConnectionFactory;
import org.apache.ftpserver.ftplet.DefaultFtpReply;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.FtpReply;
import org.apache.ftpserver.ftplet.FtpRequest;
import org.apache.ftpserver.impl.FtpIoSession;
import org.apache.ftpserver.impl.FtpServerContext;
import org.apache.ftpserver.impl.IODataConnectionFactory;
import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
import org.apache.ftpserver.impl.LocalizedFtpReply;
import org.apache.ftpserver.impl.ServerFtpStatistics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class FtpCommandSTOR extends AbstractCommand {
private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
private final AtomicReference<ProcessSessionFactory> sessionFactory;
private final CountDownLatch sessionFactorySetSignal;
private final Relationship relationshipSuccess;
public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal, Relationship relationshipSuccess) {
this.sessionFactory = sessionFactory;
this.sessionFactorySetSignal = sessionFactorySetSignal;
this.relationshipSuccess = relationshipSuccess;
}
public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
try {
executeCommand(ftpSession, context, request);
} catch (DetailedFtpCommandException ftpCommandException) {
ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
ftpCommandException.getFtpReturnCode(),
ftpCommandException.getSubId(),
ftpCommandException.getMessage(),
ftpCommandException.getFtpFile()));
} catch (FtpCommandException ftpCommandException) {
ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getMessage()));
} finally {
ftpSession.resetState();
ftpSession.getDataConnection().closeDataConnection();
}
}
private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
throws FtpCommandException {
final String fileName = getArgument(request);
checkDataConnection(ftpSession);
final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
checkWritePermission(ftpFile);
sendFileStatusOkay(ftpSession, context, request, ftpFile.getAbsolutePath());
final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
transferData(dataConnection, ftpSession, context, request, ftpFile);
}
private String getArgument(final FtpRequest request) throws FtpCommandException {
final String argument = request.getArgument();
if (argument == null) {
throw new DetailedFtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);
}
return argument;
}
private void checkDataConnection(final FtpIoSession ftpSession) throws FtpCommandException {
DataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
if (dataConnectionFactory instanceof IODataConnectionFactory) {
InetAddress address = ((IODataConnectionFactory) dataConnectionFactory)
.getInetAddress();
if (address == null) {
throw new FtpCommandException(FtpReply.REPLY_503_BAD_SEQUENCE_OF_COMMANDS, "PORT or PASV must be issued first");
}
}
}
private FtpFile getFtpFile(final FtpIoSession ftpSession, final String fileName) throws FtpCommandException {
FtpFile ftpFile = null;
try {
ftpFile = ftpSession.getFileSystemView().getFile(fileName);
} catch (FtpException e) {
LOG.error("Exception getting file object", e);
}
if (ftpFile == null) {
throw new DetailedFtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.invalid", fileName, ftpFile);
}
return ftpFile;
}
private void checkWritePermission(final FtpFile ftpFile) throws FtpCommandException {
if (!ftpFile.isWritable()) {
throw new DetailedFtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.permission", ftpFile.getAbsolutePath(), ftpFile);
}
}
private void sendFileStatusOkay(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request, final String fileAbsolutePath) {
ftpSession.write(LocalizedFtpReply.translate(ftpSession, request, context,
FtpReply.REPLY_150_FILE_STATUS_OKAY,
"STOR",
fileAbsolutePath)).awaitUninterruptibly(10000);
}
private DataConnection openDataConnection(final FtpIoSession ftpSession, final FtpFile ftpFile) throws FtpCommandException {
final DataConnection dataConnection;
try {
dataConnection = ftpSession.getDataConnection().openConnection();
} catch (Exception exception) {
LOG.error("Exception getting the input data stream", exception);
throw new DetailedFtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION,
"STOR",
ftpFile.getAbsolutePath(),
ftpFile);
}
return dataConnection;
}
private void transferData(final DataConnection dataConnection, final FtpIoSession ftpSession,
final FtpServerContext context, final FtpRequest request, final FtpFile ftpFile)
throws FtpCommandException {
final ProcessSession processSession;
try {
processSession = createProcessSession();
} catch (InterruptedException|TimeoutException exception) {
LOG.error("ProcessSession could not be acquired, command STOR aborted.", exception);
throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION, "File transfer failed.");
}
FlowFile flowFile = processSession.create();
long transferredBytes = 0L;
try (OutputStream flowFileOutputStream = processSession.write(flowFile)) {
transferredBytes = dataConnection.transferFromClient(ftpSession.getFtpletSession(), flowFileOutputStream);
LOG.info("File received {}", ftpFile.getAbsolutePath());
} catch (SocketException socketException) {
LOG.error("Socket exception during data transfer", socketException);
processSession.rollback();
throw new DetailedFtpCommandException(FtpReply.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
"STOR",
ftpFile.getAbsolutePath(),
ftpFile);
} catch (IOException ioException) {
LOG.error("IOException during data transfer", ioException);
processSession.rollback();
throw new DetailedFtpCommandException(FtpReply.REPLY_551_REQUESTED_ACTION_ABORTED_PAGE_TYPE_UNKNOWN,
"STOR",
ftpFile.getAbsolutePath(),
ftpFile);
}
try {
// notify the statistics component
ServerFtpStatistics ftpStat = (ServerFtpStatistics) context.getFtpStatistics();
ftpStat.setUpload(ftpSession, ftpFile, transferredBytes);
processSession.putAttribute(flowFile, CoreAttributes.FILENAME.key(), ftpFile.getName());
processSession.putAttribute(flowFile, CoreAttributes.PATH.key(), getPath(ftpFile));
processSession.getProvenanceReporter().modifyContent(flowFile);
processSession.transfer(flowFile, relationshipSuccess);
processSession.commit();
} catch (Exception exception) {
processSession.rollback();
LOG.error("Process session error. ", exception);
}
// if data transfer ok - send transfer complete message
ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
FtpReply.REPLY_226_CLOSING_DATA_CONNECTION, "STOR",
ftpFile.getAbsolutePath(), ftpFile, transferredBytes));
}
private String getPath(FtpFile ftpFile) {
String absolutePath = ftpFile.getAbsolutePath();
int endIndex = absolutePath.length() - ftpFile.getName().length();
return ftpFile.getAbsolutePath().substring(0, endIndex);
}
private ProcessSession createProcessSession() throws InterruptedException, TimeoutException {
ProcessSessionFactory processSessionFactory = getProcessSessionFactory();
return processSessionFactory.createSession();
}
private ProcessSessionFactory getProcessSessionFactory() throws InterruptedException, TimeoutException {
if (sessionFactorySetSignal.await(10000, TimeUnit.MILLISECONDS)) {
return sessionFactory.get();
} else {
throw new TimeoutException("Waiting period for sessionFactory is over.");
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.commands;
import org.apache.ftpserver.command.AbstractCommand;
import org.apache.ftpserver.ftplet.DefaultFtpReply;
import org.apache.ftpserver.ftplet.FtpReply;
import org.apache.ftpserver.ftplet.FtpRequest;
import org.apache.ftpserver.impl.FtpIoSession;
import org.apache.ftpserver.impl.FtpServerContext;
public class NotSupportedCommand extends AbstractCommand {
private String message;
public NotSupportedCommand(String message) {
this.message = message;
}
@Override
public void execute(FtpIoSession session, FtpServerContext context, FtpRequest request) {
// reset state variables
session.resetState();
session.write(new DefaultFtpReply(FtpReply.REPLY_502_COMMAND_NOT_IMPLEMENTED, message));
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class DefaultVirtualFileSystem implements VirtualFileSystem {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final List<VirtualPath> existingPaths = new ArrayList<>();
public DefaultVirtualFileSystem() {
existingPaths.add(ROOT);
}
@Override
public boolean mkdir(VirtualPath newFile) {
lock.writeLock().lock();
try {
if (existingPaths.contains(newFile)) {
return false;
} else {
if (existingPaths.contains(newFile.getParent())) {
existingPaths.add(newFile);
return true;
} else {
return false;
}
}
} finally {
lock.writeLock().unlock();
}
}
@Override
public boolean exists(VirtualPath virtualFile) {
lock.readLock().lock();
try {
return existingPaths.contains(virtualFile);
} finally {
lock.readLock().unlock();
}
}
@Override
public boolean delete(VirtualPath virtualFile) {
if (virtualFile.equals(ROOT)) { // Root cannot be deleted
return false;
}
lock.writeLock().lock();
try {
if (existingPaths.contains(virtualFile)) {
if (!hasSubDirectories(virtualFile)) {
return existingPaths.remove(virtualFile);
}
}
return false;
} finally {
lock.writeLock().unlock();
}
}
private boolean hasSubDirectories(VirtualPath directory) {
return existingPaths.stream().anyMatch(e -> isChildOf(directory, e));
}
private boolean isChildOf(VirtualPath parent, VirtualPath childCandidate) {
if (childCandidate.equals(ROOT)) {
return false;
}
return parent.equals(childCandidate.getParent());
}
@Override
public List<VirtualPath> listChildren(VirtualPath parent) {
List<VirtualPath> children;
lock.readLock().lock();
try {
if (parent.equals(ROOT)) {
children = existingPaths.stream()
.filter(existingPath -> (!existingPath.equals(ROOT) && (existingPath.getNameCount() == 1)))
.collect(Collectors.toList());
} else {
int parentNameCount = parent.getNameCount();
children = existingPaths.stream()
.filter(existingPath -> (((existingPath.getParent() != null) && existingPath.getParent().equals(parent))
&& (existingPath.getNameCount() == (parentNameCount + 1))))
.collect(Collectors.toList());
}
} finally {
lock.readLock().unlock();
}
return children;
}
@Override
public int getTotalNumberOfFiles() {
lock.readLock().lock();
try {
return existingPaths.size();
} finally {
lock.readLock().unlock();
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import java.util.List;
/**
* The interface for virtual file system implementations. Provides the methods for the basic file system functionality.
*/
public interface VirtualFileSystem {
/**
* The virtual root folder of the virtual file system regardless of the operating system and its native file system structure.
*/
VirtualPath ROOT = new VirtualPath("/");
/**
* Makes a new directory specified by the path received as a parameter.
*
* @param newFile The path that points to the directory that should be created.
* @return <code>true</code> if the new directory got created;
* <code>false</code> otherwise.
*/
boolean mkdir(VirtualPath newFile);
/**
* Checks if the path received as a parameter already exists in the file system.
*
* @param virtualFile The path that may or may not exist in the file system.
* @return <code>true</code> if the specified path already exists in the file system;
* <code>false</code> otherwise.
*/
boolean exists(VirtualPath virtualFile);
/**
* Deletes the file or directory specified by the path received as a parameter.
*
* @param virtualFile The path pointing to the file or directory that should be deleted.
* @return <code>true</code> if the file or directory got deleted;
* <code>false</code> otherwise.
*/
boolean delete(VirtualPath virtualFile);
/**
* Lists the files and directories that are directly under the directory specified by the path received as a parameter.
*
* @param parent The path specifying the directory the contents of which should be listed.
* @return The list of paths that point to the contents of the parent directory.
*/
List<VirtualPath> listChildren(VirtualPath parent);
/**
* Returns the number of all the files and folders in the file system.
*
* @return The number of files and folders in the file system.
*/
int getTotalNumberOfFiles();
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.User;
public class VirtualFileSystemFactory implements FileSystemFactory {
private VirtualFileSystem fileSystem;
public VirtualFileSystemFactory() {
fileSystem = new DefaultVirtualFileSystem();
}
public VirtualFileSystemFactory(VirtualFileSystem fileSystem) {
this.fileSystem = fileSystem;
}
@Override
public FileSystemView createFileSystemView(User user) {
return new VirtualFileSystemView(user, fileSystem);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VirtualFileSystemView implements FileSystemView {
private static final Logger LOG = LoggerFactory.getLogger(VirtualFileSystemView.class);
private final VirtualFileSystem fileSystem;
private VirtualPath currentDirectory = VirtualFileSystem.ROOT;
public VirtualFileSystemView(User user, VirtualFileSystem fileSystem) throws IllegalArgumentException {
if (user == null || fileSystem == null) {
throw new IllegalArgumentException("User and filesystem cannot be null.");
} else {
LOG.info("Virtual filesystem view created for user \"{}\"", user.getName());
this.fileSystem = fileSystem;
}
}
@Override
public FtpFile getHomeDirectory() {
return new VirtualFtpFile(VirtualFileSystem.ROOT, fileSystem);
}
@Override
public FtpFile getWorkingDirectory() {
return new VirtualFtpFile(currentDirectory, fileSystem);
}
@Override
public boolean changeWorkingDirectory(String targetPath) {
VirtualPath targetDirectory = currentDirectory.resolve(targetPath);
if (fileSystem.exists(targetDirectory)) {
currentDirectory = targetDirectory;
return true;
} else {
return false;
}
}
@Override
public FtpFile getFile(String fileName) throws FtpException {
VirtualPath filePath = currentDirectory.resolve(fileName);
VirtualPath parent = filePath.getParent();
if ((parent != null) && !fileSystem.exists(filePath.getParent())) {
throw new FtpException(String.format("Parent directory does not exist for %s", filePath.toString()));
}
return new VirtualFtpFile(filePath, fileSystem);
}
@Override
public boolean isRandomAccessible() {
return false;
}
@Override
public void dispose() { }
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import org.apache.ftpserver.ftplet.FtpFile;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
public class VirtualFtpFile implements FtpFile {
private final VirtualPath path;
private final VirtualFileSystem fileSystem;
private long lastModified;
public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
if (path == null || fileSystem == null) {
throw new IllegalArgumentException("File path and fileSystem cannot be null");
}
this.path = path;
this.fileSystem = fileSystem;
this.lastModified = Calendar.getInstance().getTimeInMillis();
}
@Override
public String getAbsolutePath() {
return path.toString();
}
@Override
public String getName() {
return path.getFileName();
}
@Override
public boolean isHidden() {
return false;
}
@Override
public boolean isDirectory() {
return true; // Only directories are handled since files are converted into flowfiles immediately.
}
@Override
public boolean isFile() {
return false; // Only directories are handled since files are converted into flowfiles immediately.
}
@Override
public boolean doesExist() {
return fileSystem.exists(path);
}
@Override
public boolean isReadable() {
return true;
}
@Override
public boolean isWritable() {
return true;
}
@Override
public boolean isRemovable() {
return true; //Every virtual directory can be deleted
}
@Override
public String getOwnerName() {
return "user";
}
@Override
public String getGroupName() {
return "group";
}
@Override
public int getLinkCount() {
return 1;
}
@Override
public long getLastModified() {
return lastModified;
}
@Override
public boolean setLastModified(long l) throws UnsupportedOperationException {
throw new UnsupportedOperationException("VirtualFtpFile.setLastModified()");
}
@Override
public long getSize() {
return 0;
}
@Override
public Object getPhysicalFile() throws UnsupportedOperationException {
throw new UnsupportedOperationException("VirtualFtpFile.getPhysicalFile()");
}
@Override
public boolean mkdir() {
return fileSystem.mkdir(path);
}
@Override
public boolean delete() {
return fileSystem.delete(path);
}
@Override
public boolean move(FtpFile ftpFile) throws UnsupportedOperationException {
throw new UnsupportedOperationException("VirtualFtpFile.move()");
}
@Override
public List<? extends FtpFile> listFiles() {
List<VirtualPath> paths = fileSystem.listChildren(path);
List<VirtualFtpFile> files = new ArrayList<>();
for (VirtualPath path : paths) {
files.add(new VirtualFtpFile(path, fileSystem));
}
return files;
}
@Override
public OutputStream createOutputStream(long l) throws UnsupportedOperationException {
throw new UnsupportedOperationException("VirtualFtpFile.createOutputStream()");
}
@Override
public InputStream createInputStream(long l) throws UnsupportedOperationException {
throw new UnsupportedOperationException("VirtualFtpFile.createInputStream()");
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof VirtualFtpFile)) {
return false;
}
VirtualFtpFile other = (VirtualFtpFile) o;
return fileSystem.equals(other.fileSystem) && path.equals(other.path);
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp.filesystem;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
public class VirtualPath {
private final Path path; // always normalized
public VirtualPath(String path) {
String absolutePath = File.separator + normalizeSeparator(path);
this.path = Paths.get(absolutePath).normalize();
}
private String normalizeSeparator(String path) {
String pathWithoutStartingSeparator = removeStartingSeparator(path);
String normalizedPath = pathWithoutStartingSeparator.replace(File.separatorChar, '/');
normalizedPath = normalizedPath.replace('\\', '/');
return normalizedPath;
}
private String removeStartingSeparator(String path) {
int indexOfFirstNonSeparator;
for (indexOfFirstNonSeparator = 0; indexOfFirstNonSeparator < path.length(); ++indexOfFirstNonSeparator) {
if (!(path.charAt(indexOfFirstNonSeparator) == File.separatorChar) && !(path.charAt(indexOfFirstNonSeparator) == '/')) {
break;
}
}
return path.substring(indexOfFirstNonSeparator);
}
public String getFileName() {
if (path.getFileName() == null) {
return File.separator;
} else {
return path.getFileName().toString();
}
}
public VirtualPath getParent() {
if (path.getParent() == null) {
return null;
} else {
return new VirtualPath(path.getParent().toString());
}
}
public boolean isAbsolute() {
return path.isAbsolute();
}
public VirtualPath resolve(String otherPath) {
return new VirtualPath(path.resolve(otherPath).normalize().toString());
}
public String toString() {
return path.toString();
}
public int getNameCount() {
return path.getNameCount();
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof VirtualPath)) {
return false;
}
VirtualPath other = (VirtualPath) o;
return path.equals(other.path);
}
}

View File

@ -61,6 +61,7 @@ org.apache.nifi.processors.standard.IdentifyMimeType
org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.InvokeHTTP
org.apache.nifi.processors.standard.JoltTransformJSON org.apache.nifi.processors.standard.JoltTransformJSON
org.apache.nifi.processors.standard.ListDatabaseTables org.apache.nifi.processors.standard.ListDatabaseTables
org.apache.nifi.processors.standard.ListenFTP
org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenRELP org.apache.nifi.processors.standard.ListenRELP
org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenSyslog

View File

@ -0,0 +1,82 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="UTF-8">
<title>ListenFTP</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Usage Description</h2>
<p>
By starting the processor, an FTP server is started that listens for incoming connections on the specified port.
Each file copied to this FTP server gets converted into a FlowFile and transferred to the next processor via
the ListenFTP processor's 'success' relationship.
</p>
<p>
Before starting the processor, the following properties can be set:
</p>
<p>
<ul>
<li><b>Bind Address:</b> if not set, the FTP server binds to all network interfaces of the host machine
(this is the default). If set to a valid address, the server is only available on that specific address.</li>
<li><b>Listening Port:</b> the port on which the server listens for incoming connections.
Root privileges are required on Linux to be able to use port numbers below 1024.</li>
<li><b>Username and Password:</b> Either both of them need to be set, or none of them. If set, the FTP server
only allows users to log in with the username-password pair specified in these properties.
If the Username and Password properties are left blank, the FTP server allows anonymous connections,
meaning that the client can connect to the FTP server by providing 'anonymous' as username, and leaving
the password field blank. Setting empty string as the value of these properties is not permitted, and doing so
results in the processor becoming invalid.</li>
<li><b>SSL Context Service:</b> a Controller Service can optionally be specified that provides the ability to
configure keystore and/or truststore properties. When not specified, the FTP server does not use encryption.
By specifying an SSL Context Service, the FTP server started by this processor is set to use
Transport Layer Security (TLS) over FTP (FTPS).<br>
If an SSL Context Service is selected, then a keystore file must also be specified in the SSL Context Service.
Without a keystore file, the processor cannot be started successfully.<br>
Specifying a truststore file is optional. If a truststore file is specified, client authentication is required
(the client needs to send a certificate to the server).<br>
Regardless of the selected TLS protocol, the highest available protocol is used for the connection.
For example if NiFi is running on Java 11 and TLSv1.2 is selected in the controller service as the preferred TLS protocol,
TLSv1.3 will be used (regardless of TLSv1.2 being selected) because Java 11 supports TLSv1.3.
</li>
</ul>
</p>
<p>
After starting the processor and connecting to the FTP server, an empty root directory is visible in the client application.
Folders can be created in and deleted from the root directory and any of its subdirectories.
Files can be uploaded to any directory. <b>Uploaded files do not show in the content list of directories</b>, since
files are not actually stored on this FTP server, but converted into FlowFiles and transferred to the next processor via the
'success' relationship. It is not possible to download or delete files like on a regular FTP server.<br>
All the folders (including the root directory) are virtual directories, meaning that they only exist in memory and do not get
created in the file system of the host machine. Also, these directories are not persisted: by restarting the processor all the
directories (except for the root directory) get removed. Uploaded files do not get removed by restarting the processor, since
they are not stored on the FTP server, but transferred to the next processor as FlowFiles.<br>
When a file named for example <i>text01.txt</i> is uploaded to the target folder <i>/MyDirectory/MySubdirectory</i>, a FlowFile gets
created. The content of the FlowFile is the same as the content of <i>text01.txt</i>, the 'filename' attribute of the FlowFile
contains the name of the original file (<i>text01.txt</i>) and the 'path' attribute of the flowfile contains the path where the file
was uploaded (<i>/MyDirectory/MySubdirectory/</i>).
</p>
<p>
The list of the FTP commands that are supported by the FTP server is available by starting the processor and
issuing the 'HELP' command to the server from an FTP client application.
</p>
</body>
</html>

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp;
import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestVirtualFileSystem {
private VirtualFileSystem fileSystem;
private static final List<VirtualPath> ORIGINAL_DIRECTORY_LIST = Arrays.asList(
new VirtualPath("/"),
new VirtualPath("/Directory1"),
new VirtualPath("/Directory1/SubDirectory1"),
new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"),
new VirtualPath("/Directory1/SubDirectory2"),
new VirtualPath("/Directory2"),
new VirtualPath("/Directory2/SubDirectory3"),
new VirtualPath("/Directory2/SubDirectory4")
);
@Before
public void setup() {
setupVirtualDirectoryStructure();
}
private void setupVirtualDirectoryStructure() {
fileSystem = new DefaultVirtualFileSystem();
for (VirtualPath directory : ORIGINAL_DIRECTORY_LIST) {
if (!directory.equals(VirtualFileSystem.ROOT)) {
fileSystem.mkdir(directory);
}
}
}
@Test
public void testTryToCreateDirectoryWithNonExistentParents() {
// GIVEN
VirtualPath newDirectory = new VirtualPath("/Directory3/SubDirectory5/SubSubDirectory");
// WHEN
boolean directoryCreated = fileSystem.mkdir(newDirectory);
// THEN
assertFalse(directoryCreated);
assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
}
@Test
public void testListContentsOfDirectory() {
// GIVEN
VirtualPath parent = new VirtualPath("/Directory1");
VirtualPath[] expectedSubDirectories = {
new VirtualPath("/Directory1/SubDirectory1"),
new VirtualPath("/Directory1/SubDirectory2")
};
// WHEN
List<VirtualPath> subDirectories = fileSystem.listChildren(parent);
// THEN
assertThat(subDirectories, containsInAnyOrder(expectedSubDirectories));
}
@Test
public void testListContentsOfRoot() {
// GIVEN
VirtualPath parent = new VirtualPath("/");
VirtualPath[] expectedSubDirectories = {
new VirtualPath("/Directory1"),
new VirtualPath("/Directory2")
};
// WHEN
List<VirtualPath> subDirectories = fileSystem.listChildren(parent);
// THEN
assertThat(subDirectories, containsInAnyOrder(expectedSubDirectories));
}
@Test
public void testListContentsOfEmptyDirectory() {
// GIVEN
VirtualPath parent = new VirtualPath("/Directory2/SubDirectory3");
// WHEN
List<VirtualPath> subDirectories = fileSystem.listChildren(parent);
// THEN
assertEquals(0, subDirectories.size());
}
@Test
public void testTryToDeleteNonEmptyDirectory() {
// WHEN
boolean success = fileSystem.delete(new VirtualPath("/Directory1"));
// THEN
assertFalse(success);
assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
}
@Test
public void testDeleteEmptyDirectory() {
// GIVEN
List<VirtualPath> expectedRemainingDirectories = Arrays.asList(
new VirtualPath("/"),
new VirtualPath("/Directory1"),
new VirtualPath("/Directory1/SubDirectory1"),
new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"),
new VirtualPath("/Directory1/SubDirectory2"),
new VirtualPath("/Directory2"),
new VirtualPath("/Directory2/SubDirectory4")
);
// WHEN
boolean success = fileSystem.delete(new VirtualPath("/Directory2/SubDirectory3"));
// THEN
assertTrue(success);
assertAllDirectoriesAre(expectedRemainingDirectories);
}
@Test
public void testDeleteRoot() {
// WHEN
boolean success = fileSystem.delete(VirtualFileSystem.ROOT);
// THEN
assertFalse(success);
assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
}
@Test
public void testDeleteNonExistentDirectory() {
// WHEN
boolean success = fileSystem.delete(new VirtualPath("/Directory3"));
// THEN
assertFalse(success);
assertAllDirectoriesAre(ORIGINAL_DIRECTORY_LIST);
}
private void assertAllDirectoriesAre(List<VirtualPath> expectedDirectories) {
if (expectedDirectories.size() != fileSystem.getTotalNumberOfFiles()) {
fail();
} else {
for (VirtualPath path : expectedDirectories) {
if (!fileSystem.exists(path)) {
fail();
}
}
}
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class TestVirtualFileSystemView {
private FileSystemView fileSystemView;
private static VirtualFileSystem fileSystem;
@BeforeClass
public static void setupVirtualFileSystem() {
fileSystem = new DefaultVirtualFileSystem();
fileSystem.mkdir(new VirtualPath("/Directory1"));
fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1"));
fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"));
fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory2"));
fileSystem.mkdir(new VirtualPath("/Directory2"));
fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory3"));
fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory4"));
}
@Before
public void setup() throws FtpException {
User user = createUser();
FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory(fileSystem);
fileSystemView = fileSystemFactory.createFileSystemView(user);
}
@Test
public void testInRootDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
// WHEN
// We do not change directories
// THEN
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testTryToMakeRootDirectory() {
// WHEN
boolean directoryCreated = fileSystem.mkdir(VirtualFileSystem.ROOT);
// THEN
assertFalse(directoryCreated);
}
@Test
public void testChangeToAnotherDirectory() throws FtpException {
// GIVEN
String expectedHomeDirectory = File.separator;
String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
// WHEN
fileSystemView.changeWorkingDirectory("/Directory1");
// THEN
assertHomeDirectoryEquals(expectedHomeDirectory);
assertCurrentDirectoryEquals(expectedCurrentDirectory);
}
@Test
public void testChangeToRootDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
// WHEN
fileSystemView.changeWorkingDirectory("/");
// THEN
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testChangeToUnspecifiedDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
// WHEN
fileSystemView.changeWorkingDirectory("");
// THEN
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testChangeToSameDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
// WHEN
fileSystemView.changeWorkingDirectory(".");
// THEN
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testChangeToSameDirectoryNonRoot() throws FtpException {
// GIVEN
String expectedHomeDirectory = File.separator;
String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
fileSystemView.changeWorkingDirectory("/Directory1");
// WHEN
fileSystemView.changeWorkingDirectory(".");
// THEN
assertHomeDirectoryEquals(expectedHomeDirectory);
assertCurrentDirectoryEquals(expectedCurrentDirectory);
}
@Test
public void testChangeToParentDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
fileSystemView.changeWorkingDirectory("/Directory1");
// WHEN
fileSystemView.changeWorkingDirectory("..");
// THEN
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testChangeToParentDirectoryNonRoot() throws FtpException {
// GIVEN
String expectedHomeDirectory = File.separator;
String expectedCurrentDirectory = "/Directory1".replace('/', File.separatorChar);
fileSystemView.changeWorkingDirectory("/Directory1");
fileSystemView.changeWorkingDirectory("SubDirectory1");
// WHEN
fileSystemView.changeWorkingDirectory("..");
// THEN
assertHomeDirectoryEquals(expectedHomeDirectory);
assertCurrentDirectoryEquals(expectedCurrentDirectory);
}
@Test
public void testChangeToNonExistentDirectory() throws FtpException {
// GIVEN
String expectedDirectory = File.separator;
// WHEN
boolean changeDirectoryResult = fileSystemView.changeWorkingDirectory("/Directory2/SubDirectory3/SubSubDirectory");
// THEN
assertFalse(changeDirectoryResult);
assertHomeDirectoryEquals(expectedDirectory);
assertCurrentDirectoryEquals(expectedDirectory);
}
@Test
public void testGetFileAbsolute() throws FtpException {
// GIVEN
String expectedDirectory = "/Directory2/SubDirectory3".replace('/', File.separatorChar);
fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
// WHEN
FtpFile file = fileSystemView.getFile("/Directory2/SubDirectory3");
// THEN
assertEquals(expectedDirectory, file.getAbsolutePath());
}
@Test
public void testGetFileNonAbsolute() throws FtpException {
// GIVEN
String expectedDirectory = "/Directory1/SubDirectory1/SubSubDirectory".replace('/', File.separatorChar);
fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
// WHEN
FtpFile file = fileSystemView.getFile("SubSubDirectory");
// THEN
assertEquals(expectedDirectory, file.getAbsolutePath());
}
private User createUser() {
BaseUser user = new BaseUser();
user.setName("Username");
user.setPassword("Password");
user.setHomeDirectory("/abc/def");
user.setAuthorities(Collections.singletonList(new WritePermission()));
return user;
}
private void assertHomeDirectoryEquals(String expectedHomeDirectory) throws FtpException {
FtpFile homeDirectory = fileSystemView.getHomeDirectory();
assertEquals(expectedHomeDirectory, homeDirectory.getAbsolutePath());
}
private void assertCurrentDirectoryEquals(String expectedCurrentDirectory) throws FtpException {
FtpFile currentDirectory = fileSystemView.getWorkingDirectory();
assertEquals(expectedCurrentDirectory, currentDirectory.getAbsolutePath());
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.ftp;
import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
public class TestVirtualPath {
@Test
public void testCreatePathStartingWithSlash() {
// GIVEN
String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathStartingWithDoubleSlash() {
// GIVEN
String expectedPath = "/Directory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("//Directory1");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathEndingWithSlash() {
// GIVEN
String expectedPath = "/Directory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathEndingWithDoubleSlash() {
// GIVEN
String expectedPath = "/Directory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1//");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathNotStartingWithSlash() {
// GIVEN
String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("Directory1/Directory2");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatPathToRoot() {
// GIVEN
String expectedPath = File.separator;
VirtualPath objectUnderTest = new VirtualPath("/");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathToRootWithDoubleSlash() {
// GIVEN
String expectedPath = File.separator;
VirtualPath objectUnderTest = new VirtualPath("//");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathThatNeedsToBeResolved() {
// GIVEN
String expectedPath = "/Directory1/SubDirectory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("//Directory1/SubDirectory1/../SubDirectory1");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathWithWhitespace() {
// GIVEN
String expectedPath = "/Directory 1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory 1");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathWithBackslashes() {
// GIVEN
String expectedPath = "/Directory1/SubDirectory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("\\Directory1\\SubDirectory1");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testCreatePathWithSpecialCharacters() {
// GIVEN
String expectedPath = "/űáú▣☃/SubDirectory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/űáú▣☃/SubDirectory1");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testEmptyPathPointsToRoot() {
// GIVEN
String expectedPath = File.separator;
VirtualPath objectUnderTest = new VirtualPath("");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testPathIsNormalized() {
// GIVEN
String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1///Directory2\\\\Directory3/Directory4/../..");
// WHEN
String result = objectUnderTest.toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testGetFileNameForRoot() {
// GIVEN
String expectedPath = File.separator;
VirtualPath objectUnderTest = new VirtualPath("/");
// WHEN, THEN
assertEquals(expectedPath, objectUnderTest.getFileName());
}
@Test
public void testGetFileNameForNonRoot() {
// GIVEN
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2/file.txt");
// WHEN
String result = objectUnderTest.getFileName();
// THEN
assertEquals("file.txt", result);
}
@Test
public void testGetParentForRoot() {
// GIVEN
VirtualPath objectUnderTest = new VirtualPath("/");
// WHEN, THEN
assertNull(objectUnderTest.getParent());
}
@Test
public void testGetParentForNonRoot() {
// GIVEN
String expectedPath = "/Directory1/Directory2".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2/file.txt");
// WHEN
VirtualPath parent = objectUnderTest.getParent();
// THEN
assertEquals(expectedPath, parent.toString());
}
@Test
public void testResolveToARelativePath() {
// GIVEN
String expectedPath = "/Directory1/Directory2/Directory3/Directory4".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
// WHEN
String result = objectUnderTest.resolve("Directory3/Directory4").toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testResolveToParent() {
// GIVEN
String expectedPath = "/Directory1".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
// WHEN
String result = objectUnderTest.resolve("..").toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testResolveToAnAbsolutePath() {
// GIVEN
String expectedPath = "/Directory3/Directory4".replace('/', File.separatorChar);
VirtualPath objectUnderTest = new VirtualPath("/Directory1/Directory2");
// WHEN
String result = objectUnderTest.resolve("/Directory3/Directory4").toString();
// THEN
assertEquals(expectedPath, result);
}
@Test
public void testEquals() {
// GIVEN
VirtualPath path1 = new VirtualPath("/Directory1/Directory2");
VirtualPath path2 = new VirtualPath("/Directory1/Directory2");
// WHEN, THEN
assertEquals(path1, path2);
}
@Test
public void testDoesNotEqual() {
// GIVEN
VirtualPath path1 = new VirtualPath("/Directory1/Directory2");
VirtualPath path2 = new VirtualPath("/Directory1/Directory3");
// WHEN, THEN
assertNotEquals(path1, path2);
}
}