HADOOP-13953. Make FTPFileSystem's data connection mode and transfer mode configurable. Contributed by Xiao Chen.

This commit is contained in:
Wei-Chiu Chuang 2017-01-09 15:18:26 -08:00
parent 91bf504440
commit 0a212a40fc
4 changed files with 143 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.net.ConnectException;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem {
public static final String FS_FTP_HOST = "fs.ftp.host";
public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
public static final String FS_FTP_DATA_CONNECTION_MODE =
"fs.ftp.data.connection.mode";
public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode";
public static final String E_SAME_DIRECTORY_ONLY =
"only same directory renames are supported";
@ -143,9 +147,10 @@ public class FTPFileSystem extends FileSystem {
NetUtils.UNKNOWN_HOST, 0,
new ConnectException("Server response " + reply));
} else if (client.login(user, password)) {
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
client.setFileTransferMode(getTransferMode(conf));
client.setFileType(FTP.BINARY_FILE_TYPE);
client.setBufferSize(DEFAULT_BUFFER_SIZE);
setDataConnectionMode(client, conf);
} else {
throw new IOException("Login failed on server - " + host + ", port - "
+ port + " as user '" + user + "'");
@ -154,6 +159,69 @@ public class FTPFileSystem extends FileSystem {
return client;
}
/**
* Set FTP's transfer mode based on configuration. Valid values are
* STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
* <p/>
* Defaults to BLOCK_TRANSFER_MODE.
*
* @param conf
* @return
*/
@VisibleForTesting
int getTransferMode(Configuration conf) {
final String mode = conf.get(FS_FTP_TRANSFER_MODE);
// FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is
// FTP.BLOCK_TRANSFER_MODE historically.
int ret = FTP.BLOCK_TRANSFER_MODE;
if (mode == null) {
return ret;
}
final String upper = mode.toUpperCase();
if (upper.equals("STREAM_TRANSFER_MODE")) {
ret = FTP.STREAM_TRANSFER_MODE;
} else if (upper.equals("COMPRESSED_TRANSFER_MODE")) {
ret = FTP.COMPRESSED_TRANSFER_MODE;
} else {
if (!upper.equals("BLOCK_TRANSFER_MODE")) {
LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": "
+ mode + ". Using default.");
}
}
return ret;
}
/**
* Set the FTPClient's data connection mode based on configuration. Valid
* values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
* PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
* <p/>
* Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE.
*
* @param client
* @param conf
* @throws IOException
*/
@VisibleForTesting
void setDataConnectionMode(FTPClient client, Configuration conf)
throws IOException {
final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE);
if (mode == null) {
return;
}
final String upper = mode.toUpperCase();
if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) {
client.enterLocalPassiveMode();
} else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) {
client.enterRemotePassiveMode();
} else {
if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) {
LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE
+ ": " + mode + ". Using default.");
}
}
}
/**
* Logout and disconnect the given FTPClient. *
*

View File

@ -782,6 +782,24 @@
</description>
</property>
<property>
<name>fs.ftp.data.connection.mode</name>
<value>ACTIVE_LOCAL_DATA_CONNECTION_MODE</value>
<description>Set the FTPClient's data connection mode based on configuration.
Valid values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
</description>
</property>
<property>
<name>fs.ftp.transfer.mode</name>
<value>BLOCK_TRANSFER_MODE</value>
<description>
Set FTP's transfer mode based on configuration. Valid values are
STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
</description>
</property>
<property>
<name>fs.df.interval</name>
<value>60000</value>

View File

@ -89,6 +89,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
// Lots of properties not in the above classes
xmlPropsToSkipCompare.add("fs.ftp.password.localhost");
xmlPropsToSkipCompare.add("fs.ftp.user.localhost");
xmlPropsToSkipCompare.add("fs.ftp.data.connection.mode");
xmlPropsToSkipCompare.add("fs.ftp.transfer.mode");
xmlPropsToSkipCompare.add("hadoop.tmp.dir");
xmlPropsToSkipCompare.add("nfs3.mountd.port");
xmlPropsToSkipCompare.add("nfs3.server.port");

View File

@ -19,15 +19,67 @@ package org.apache.hadoop.fs.ftp;
import org.apache.commons.net.ftp.FTP;
import org.junit.Assert;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.conf.Configuration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.junit.Assert.assertEquals;
/**
* Test basic @{link FTPFileSystem} class methods. Contract tests are in
* TestFTPContractXXXX.
*/
public class TestFTPFileSystem {
@Rule
public Timeout testTimeout = new Timeout(180000);
@Test
public void testFTPDefaultPort() throws Exception {
FTPFileSystem ftp = new FTPFileSystem();
Assert.assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
}
@Test
public void testFTPTransferMode() throws Exception {
Configuration conf = new Configuration();
FTPFileSystem ftp = new FTPFileSystem();
assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE");
assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf));
conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE");
assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf));
conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid");
assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
}
@Test
public void testFTPDataConnectionMode() throws Exception {
Configuration conf = new Configuration();
FTPClient client = new FTPClient();
FTPFileSystem ftp = new FTPFileSystem();
assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
client.getDataConnectionMode());
ftp.setDataConnectionMode(client, conf);
assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
client.getDataConnectionMode());
conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid");
ftp.setDataConnectionMode(client, conf);
assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
client.getDataConnectionMode());
conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE,
"PASSIVE_LOCAL_DATA_CONNECTION_MODE");
ftp.setDataConnectionMode(client, conf);
assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE,
client.getDataConnectionMode());
}
}