diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ee96eee26be..194e2e38772 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -526,6 +526,9 @@ Release 2.8.0 - UNRELEASED HADOOP-10971. Add -C flag to make `hadoop fs -ls` print filenames only. (Kengo Seki via aajisaka) + HADOOP-5732. Add SFTP FileSystem. (Ramtin Boustani and Inigo Goiri via + cdouglas) + IMPROVEMENTS HADOOP-6842. "hadoop fs -text" does not give a useful text representation diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index c3a306b8aa7..6b1388a980d 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -238,6 +238,11 @@ jsr305 compile + + org.apache.sshd + sshd-core + test + org.apache.htrace diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPConnectionPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPConnectionPool.java new file mode 100644 index 00000000000..c7fae7bd5f6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPConnectionPool.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.sftp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; + +/** Concurrent/Multiple Connections. */ +class SFTPConnectionPool { + + public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class); + // Maximum number of allowed live connections. This doesn't mean we cannot + // have more live connections. It means that when we have more + // live connections than this threshold, any unused connection will be + // closed. + private int maxConnection; + private int liveConnectionCount = 0; + private HashMap> idleConnections = + new HashMap>(); + private HashMap con2infoMap = + new HashMap(); + + SFTPConnectionPool(int maxConnection) { + this.maxConnection = maxConnection; + } + + synchronized ChannelSftp getFromPool(ConnectionInfo info) throws IOException { + Set cons = idleConnections.get(info); + ChannelSftp channel; + + if (cons != null && cons.size() > 0) { + Iterator it = cons.iterator(); + if (it.hasNext()) { + channel = it.next(); + idleConnections.remove(info); + return channel; + } else { + throw new IOException("Connection pool error."); + } + } + return null; + } + + /** Add the channel into pool. + * @param channel + */ + synchronized void returnToPool(ChannelSftp channel) { + ConnectionInfo info = con2infoMap.get(channel); + HashSet cons = idleConnections.get(info); + if (cons == null) { + cons = new HashSet(); + idleConnections.put(info, cons); + } + cons.add(channel); + + } + + /** Shutdown the connection pool and close all open connections. */ + synchronized void shutdown() { + if (this.con2infoMap == null){ + return; // already shutdown in case it is called + } + LOG.info("Inside shutdown, con2infoMap size=" + con2infoMap.size()); + + this.maxConnection = 0; + Set cons = con2infoMap.keySet(); + if (cons != null && cons.size() > 0) { + // make a copy since we need to modify the underlying Map + Set copy = new HashSet(cons); + // Initiate disconnect from all outstanding connections + for (ChannelSftp con : copy) { + try { + disconnect(con); + } catch (IOException ioe) { + ConnectionInfo info = con2infoMap.get(con); + LOG.error( + "Error encountered while closing connection to " + info.getHost(), + ioe); + } + } + } + // make sure no further connections can be returned. + this.idleConnections = null; + this.con2infoMap = null; + } + + public synchronized int getMaxConnection() { + return maxConnection; + } + + public synchronized void setMaxConnection(int maxConn) { + this.maxConnection = maxConn; + } + + public ChannelSftp connect(String host, int port, String user, + String password, String keyFile) throws IOException { + // get connection from pool + ConnectionInfo info = new ConnectionInfo(host, port, user); + ChannelSftp channel = getFromPool(info); + + if (channel != null) { + if (channel.isConnected()) { + return channel; + } else { + channel = null; + synchronized (this) { + --liveConnectionCount; + con2infoMap.remove(channel); + } + } + } + + // create a new connection and add to pool + JSch jsch = new JSch(); + Session session = null; + try { + if (user == null || user.length() == 0) { + user = System.getProperty("user.name"); + } + + if (password == null) { + password = ""; + } + + if (keyFile != null && keyFile.length() > 0) { + jsch.addIdentity(keyFile); + } + + if (port <= 0) { + session = jsch.getSession(user, host); + } else { + session = jsch.getSession(user, host, port); + } + + session.setPassword(password); + + java.util.Properties config = new java.util.Properties(); + config.put("StrictHostKeyChecking", "no"); + session.setConfig(config); + + session.connect(); + channel = (ChannelSftp) session.openChannel("sftp"); + channel.connect(); + + synchronized (this) { + con2infoMap.put(channel, info); + liveConnectionCount++; + } + + return channel; + + } catch (JSchException e) { + throw new IOException(StringUtils.stringifyException(e)); + } + } + + void disconnect(ChannelSftp channel) throws IOException { + if (channel != null) { + // close connection if too many active connections + boolean closeConnection = false; + synchronized (this) { + if (liveConnectionCount > maxConnection) { + --liveConnectionCount; + con2infoMap.remove(channel); + closeConnection = true; + } + } + if (closeConnection) { + if (channel.isConnected()) { + try { + Session session = channel.getSession(); + channel.disconnect(); + session.disconnect(); + } catch (JSchException e) { + throw new IOException(StringUtils.stringifyException(e)); + } + } + + } else { + returnToPool(channel); + } + } + } + + public int getIdleCount() { + return this.idleConnections.size(); + } + + public int getLiveConnCount() { + return this.liveConnectionCount; + } + + public int getConnPoolSize() { + return this.con2infoMap.size(); + } + + /** + * Class to capture the minimal set of information that distinguish + * between different connections. + */ + static class ConnectionInfo { + private String host = ""; + private int port; + private String user = ""; + + ConnectionInfo(String hst, int prt, String usr) { + this.host = hst; + this.port = prt; + this.user = usr; + } + + public String getHost() { + return host; + } + + public void setHost(String hst) { + this.host = hst; + } + + public int getPort() { + return port; + } + + public void setPort(int prt) { + this.port = prt; + } + + public String getUser() { + return user; + } + + public void setUser(String usr) { + this.user = usr; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof ConnectionInfo) { + ConnectionInfo con = (ConnectionInfo) obj; + + boolean ret = true; + if (this.host == null || !this.host.equalsIgnoreCase(con.host)) { + ret = false; + } + if (this.port >= 0 && this.port != con.port) { + ret = false; + } + if (this.user == null || !this.user.equalsIgnoreCase(con.user)) { + ret = false; + } + return ret; + } else { + return false; + } + + } + + @Override + public int hashCode() { + int hashCode = 0; + if (host != null) { + hashCode += host.hashCode(); + } + hashCode += port; + if (user != null) { + hashCode += user.hashCode(); + } + return hashCode; + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java new file mode 100644 index 00000000000..8b6267a6a18 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -0,0 +1,671 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.sftp; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.ChannelSftp.LsEntry; +import com.jcraft.jsch.SftpATTRS; +import com.jcraft.jsch.SftpException; + +/** SFTP FileSystem. */ +public class SFTPFileSystem extends FileSystem { + + public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class); + + private SFTPConnectionPool connectionPool; + private URI uri; + + private static final int DEFAULT_SFTP_PORT = 22; + private static final int DEFAULT_MAX_CONNECTION = 5; + public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; + public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; + public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user."; + public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password."; + public static final String FS_SFTP_HOST = "fs.sftp.host"; + public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port"; + public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile"; + public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max"; + public static final String E_SAME_DIRECTORY_ONLY = + "only same directory renames are supported"; + public static final String E_HOST_NULL = "Invalid host specified"; + public static final String E_USER_NULL = + "No user specified for sftp connection. Expand URI or credential file."; + public static final String E_PATH_DIR = "Path %s is a directory."; + public static final String E_FILE_STATUS = "Failed to get file status"; + public static final String E_FILE_NOTFOUND = "File %s does not exist."; + public static final String E_FILE_EXIST = "File already exists: %s"; + public static final String E_CREATE_DIR = + "create(): Mkdirs failed to create: %s"; + public static final String E_DIR_CREATE_FROMFILE = + "Can't make directory for path %s since it is a file."; + public static final String E_MAKE_DIR_FORPATH = + "Can't make directory for path \"%s\" under \"%s\"."; + public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty."; + public static final String E_FILE_CHECK_FAILED = "File check failed"; + public static final String E_NOT_SUPPORTED = "Not supported"; + public static final String E_SPATH_NOTEXIST = "Source path %s does not exist"; + public static final String E_DPATH_EXIST = + "Destination path %s already exist, cannot rename!"; + public static final String E_FAILED_GETHOME = "Failed to get home directory"; + public static final String E_FAILED_DISCONNECT = "Failed to disconnect"; + + /** + * Set configuration from UI. + * + * @param uri + * @param conf + * @throws IOException + */ + private void setConfigurationFromURI(URI uriInfo, Configuration conf) + throws IOException { + + // get host information from URI + String host = uriInfo.getHost(); + host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host; + if (host == null) { + throw new IOException(E_HOST_NULL); + } + conf.set(FS_SFTP_HOST, host); + + int port = uriInfo.getPort(); + port = (port == -1) + ? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT) + : port; + conf.setInt(FS_SFTP_HOST_PORT, port); + + // get user/password information from URI + String userAndPwdFromUri = uriInfo.getUserInfo(); + if (userAndPwdFromUri != null) { + String[] userPasswdInfo = userAndPwdFromUri.split(":"); + String user = userPasswdInfo[0]; + user = URLDecoder.decode(user, "UTF-8"); + conf.set(FS_SFTP_USER_PREFIX + host, user); + if (userPasswdInfo.length > 1) { + conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." + + user, userPasswdInfo[1]); + } + } + + String user = conf.get(FS_SFTP_USER_PREFIX + host); + if (user == null || user.equals("")) { + throw new IllegalStateException(E_USER_NULL); + } + + int connectionMax = + conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION); + connectionPool = new SFTPConnectionPool(connectionMax); + } + + /** + * Connecting by using configuration parameters. + * + * @return An FTPClient instance + * @throws IOException + */ + private ChannelSftp connect() throws IOException { + Configuration conf = getConf(); + + String host = conf.get(FS_SFTP_HOST, null); + int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT); + String user = conf.get(FS_SFTP_USER_PREFIX + host, null); + String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null); + String keyFile = conf.get(FS_SFTP_KEYFILE, null); + + ChannelSftp channel = + connectionPool.connect(host, port, user, pwd, keyFile); + + return channel; + } + + /** + * Logout and disconnect the given channel. + * + * @param client + * @throws IOException + */ + private void disconnect(ChannelSftp channel) throws IOException { + connectionPool.disconnect(channel); + } + + /** + * Resolve against given working directory. + * + * @param workDir + * @param path + * @return absolute path + */ + private Path makeAbsolute(Path workDir, Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workDir, path); + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + * @throws IOException + */ + private boolean exists(ChannelSftp channel, Path file) throws IOException { + try { + getFileStatus(channel, file); + return true; + } catch (FileNotFoundException fnfe) { + return false; + } catch (IOException ioe) { + throw new IOException(E_FILE_STATUS, ioe); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + @SuppressWarnings("unchecked") + private FileStatus getFileStatus(ChannelSftp client, Path file) + throws IOException { + FileStatus fileStat = null; + Path workDir; + try { + workDir = new Path(client.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, file); + Path parentPath = absolute.getParent(); + if (parentPath == null) { // root directory + long length = -1; // Length of root directory on server not known + boolean isDir = true; + int blockReplication = 1; + long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. + long modTime = -1; // Modification time of root directory not known. + Path root = new Path("/"); + return new FileStatus(length, isDir, blockReplication, blockSize, + modTime, + root.makeQualified(this.getUri(), this.getWorkingDirectory())); + } + String pathName = parentPath.toUri().getPath(); + Vector sftpFiles; + try { + sftpFiles = (Vector) client.ls(pathName); + } catch (SftpException e) { + throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); + } + if (sftpFiles != null) { + for (LsEntry sftpFile : sftpFiles) { + if (sftpFile.getFilename().equals(file.getName())) { + // file found in directory + fileStat = getFileStatus(client, sftpFile, parentPath); + break; + } + } + if (fileStat == null) { + throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); + } + } else { + throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); + } + return fileStat; + } + + /** + * Convert the file information in LsEntry to a {@link FileStatus} object. * + * + * @param sftpFile + * @param parentPath + * @return file status + * @throws IOException + */ + private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile, + Path parentPath) throws IOException { + + SftpATTRS attr = sftpFile.getAttrs(); + long length = attr.getSize(); + boolean isDir = attr.isDir(); + boolean isLink = attr.isLink(); + if (isLink) { + String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename(); + try { + link = channel.realpath(link); + + Path linkParent = new Path("/", link); + + FileStatus fstat = getFileStatus(channel, linkParent); + isDir = fstat.isDirectory(); + length = fstat.getLen(); + } catch (Exception e) { + throw new IOException(e); + } + } + int blockReplication = 1; + // Using default block size since there is no way in SFTP channel to know of + // block sizes on server. The assumption could be less than ideal. + long blockSize = DEFAULT_BLOCK_SIZE; + long modTime = attr.getMTime() * 1000; // convert to milliseconds + long accessTime = 0; + FsPermission permission = getPermissions(sftpFile); + // not be able to get the real user group name, just use the user and group + // id + String user = Integer.toString(attr.getUId()); + String group = Integer.toString(attr.getGId()); + Path filePath = new Path(parentPath, sftpFile.getFilename()); + + return new FileStatus(length, isDir, blockReplication, blockSize, modTime, + accessTime, permission, user, group, filePath.makeQualified( + this.getUri(), this.getWorkingDirectory())); + } + + /** + * Return file permission. + * + * @param sftpFile + * @return file permission + */ + private FsPermission getPermissions(LsEntry sftpFile) { + return new FsPermission((short) sftpFile.getAttrs().getPermissions()); + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) + throws IOException { + boolean created = true; + Path workDir; + try { + workDir = new Path(client.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, file); + String pathName = absolute.getName(); + if (!exists(client, absolute)) { + Path parent = absolute.getParent(); + created = + (parent == null || mkdirs(client, parent, FsPermission.getDefault())); + if (created) { + String parentDir = parent.toUri().getPath(); + boolean succeeded = true; + try { + client.cd(parentDir); + client.mkdir(pathName); + } catch (SftpException e) { + throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName, + parentDir)); + } + created = created & succeeded; + } + } else if (isFile(client, absolute)) { + throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute)); + } + return created; + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + * @throws IOException + */ + private boolean isFile(ChannelSftp channel, Path file) throws IOException { + try { + return !getFileStatus(channel, file).isDirectory(); + } catch (FileNotFoundException e) { + return false; // file does not exist + } catch (IOException ioe) { + throw new IOException(E_FILE_CHECK_FAILED, ioe); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean delete(ChannelSftp channel, Path file, boolean recursive) + throws IOException { + Path workDir; + try { + workDir = new Path(channel.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, file); + String pathName = absolute.toUri().getPath(); + FileStatus fileStat = null; + try { + fileStat = getFileStatus(channel, absolute); + } catch (FileNotFoundException e) { + // file not found, no need to delete, return true + return false; + } + if (!fileStat.isDirectory()) { + boolean status = true; + try { + channel.rm(pathName); + } catch (SftpException e) { + status = false; + } + return status; + } else { + boolean status = true; + FileStatus[] dirEntries = listStatus(channel, absolute); + if (dirEntries != null && dirEntries.length > 0) { + if (!recursive) { + throw new IOException(String.format(E_DIR_NOTEMPTY, file)); + } + for (int i = 0; i < dirEntries.length; ++i) { + delete(channel, new Path(absolute, dirEntries[i].getPath()), + recursive); + } + } + try { + channel.rmdir(pathName); + } catch (SftpException e) { + status = false; + } + return status; + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + @SuppressWarnings("unchecked") + private FileStatus[] listStatus(ChannelSftp client, Path file) + throws IOException { + Path workDir; + try { + workDir = new Path(client.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, file); + FileStatus fileStat = getFileStatus(client, absolute); + if (!fileStat.isDirectory()) { + return new FileStatus[] {fileStat}; + } + Vector sftpFiles; + try { + sftpFiles = (Vector) client.ls(absolute.toUri().getPath()); + } catch (SftpException e) { + throw new IOException(e); + } + ArrayList fileStats = new ArrayList(); + for (int i = 0; i < sftpFiles.size(); i++) { + LsEntry entry = sftpFiles.get(i); + String fname = entry.getFilename(); + // skip current and parent directory, ie. "." and ".." + if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) { + fileStats.add(getFileStatus(client, entry, absolute)); + } + } + return fileStats.toArray(new FileStatus[fileStats.size()]); + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + * + * @param channel + * @param src + * @param dst + * @return rename successful? + * @throws IOException + */ + private boolean rename(ChannelSftp channel, Path src, Path dst) + throws IOException { + Path workDir; + try { + workDir = new Path(channel.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absoluteSrc = makeAbsolute(workDir, src); + Path absoluteDst = makeAbsolute(workDir, dst); + + if (!exists(channel, absoluteSrc)) { + throw new IOException(String.format(E_SPATH_NOTEXIST, src)); + } + if (exists(channel, absoluteDst)) { + throw new IOException(String.format(E_DPATH_EXIST, dst)); + } + boolean renamed = true; + try { + channel.cd("/"); + channel.rename(src.toUri().getPath(), dst.toUri().getPath()); + } catch (SftpException e) { + renamed = false; + } + return renamed; + } + + @Override + public void initialize(URI uriInfo, Configuration conf) throws IOException { + super.initialize(uriInfo, conf); + + setConfigurationFromURI(uriInfo, conf); + setConf(conf); + this.uri = uriInfo; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + ChannelSftp channel = connect(); + Path workDir; + try { + workDir = new Path(channel.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, f); + FileStatus fileStat = getFileStatus(channel, absolute); + if (fileStat.isDirectory()) { + disconnect(channel); + throw new IOException(String.format(E_PATH_DIR, f)); + } + InputStream is; + try { + // the path could be a symbolic link, so get the real path + absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); + + is = channel.get(absolute.toUri().getPath()); + } catch (SftpException e) { + throw new IOException(e); + } + + FSDataInputStream fis = + new FSDataInputStream(new SFTPInputStream(is, channel, statistics)); + return fis; + } + + /** + * A stream obtained via this call must be closed before using other APIs of + * this class or else the invocation will block. + */ + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + final ChannelSftp client = connect(); + Path workDir; + try { + workDir = new Path(client.pwd()); + } catch (SftpException e) { + throw new IOException(e); + } + Path absolute = makeAbsolute(workDir, f); + if (exists(client, f)) { + if (overwrite) { + delete(client, f, false); + } else { + disconnect(client); + throw new IOException(String.format(E_FILE_EXIST, f)); + } + } + Path parent = absolute.getParent(); + if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) { + parent = (parent == null) ? new Path("/") : parent; + disconnect(client); + throw new IOException(String.format(E_CREATE_DIR, parent)); + } + OutputStream os; + try { + client.cd(parent.toUri().getPath()); + os = client.put(f.getName()); + } catch (SftpException e) { + throw new IOException(e); + } + FSDataOutputStream fos = new FSDataOutputStream(os, statistics) { + @Override + public void close() throws IOException { + super.close(); + disconnect(client); + } + }; + + return fos; + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) + throws IOException { + throw new IOException(E_NOT_SUPPORTED); + } + + /* + * The parent of source and destination can be different. It is suppose to + * work like 'move' + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + ChannelSftp channel = connect(); + try { + boolean success = rename(channel, src, dst); + return success; + } finally { + disconnect(channel); + } + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + ChannelSftp channel = connect(); + try { + boolean success = delete(channel, f, recursive); + return success; + } finally { + disconnect(channel); + } + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + ChannelSftp client = connect(); + try { + FileStatus[] stats = listStatus(client, f); + return stats; + } finally { + disconnect(client); + } + } + + @Override + public void setWorkingDirectory(Path newDir) { + // we do not maintain the working directory state + } + + @Override + public Path getWorkingDirectory() { + // Return home directory always since we do not maintain state. + return getHomeDirectory(); + } + + @Override + public Path getHomeDirectory() { + ChannelSftp channel = null; + try { + channel = connect(); + Path homeDir = new Path(channel.pwd()); + return homeDir; + } catch (Exception ioe) { + return null; + } finally { + try { + disconnect(channel); + } catch (IOException ioe) { + return null; + } + } + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + ChannelSftp client = connect(); + try { + boolean success = mkdirs(client, f, permission); + return success; + } finally { + disconnect(client); + } + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + ChannelSftp channel = connect(); + try { + FileStatus status = getFileStatus(channel, f); + return status; + } finally { + disconnect(channel); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java new file mode 100644 index 00000000000..ece2c1c980b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.sftp; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.StringUtils; + +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; + +/** SFTP FileSystem input stream. */ +class SFTPInputStream extends FSInputStream { + + public static final String E_SEEK_NOTSUPPORTED = "Seek not supported"; + public static final String E_CLIENT_NULL = + "SFTP client null or not connected"; + public static final String E_NULL_INPUTSTREAM = "Null InputStream"; + public static final String E_STREAM_CLOSED = "Stream closed"; + public static final String E_CLIENT_NOTCONNECTED = "Client not connected"; + + private InputStream wrappedStream; + private ChannelSftp channel; + private FileSystem.Statistics stats; + private boolean closed; + private long pos; + + SFTPInputStream(InputStream stream, ChannelSftp channel, + FileSystem.Statistics stats) { + + if (stream == null) { + throw new IllegalArgumentException(E_NULL_INPUTSTREAM); + } + if (channel == null || !channel.isConnected()) { + throw new IllegalArgumentException(E_CLIENT_NULL); + } + this.wrappedStream = stream; + this.channel = channel; + this.stats = stats; + + this.pos = 0; + this.closed = false; + } + + @Override + public void seek(long position) throws IOException { + throw new IOException(E_SEEK_NOTSUPPORTED); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + throw new IOException(E_SEEK_NOTSUPPORTED); + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public synchronized int read() throws IOException { + if (closed) { + throw new IOException(E_STREAM_CLOSED); + } + + int byteRead = wrappedStream.read(); + if (byteRead >= 0) { + pos++; + } + if (stats != null & byteRead >= 0) { + stats.incrementBytesRead(1); + } + return byteRead; + } + + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + if (closed) { + throw new IOException(E_STREAM_CLOSED); + } + + int result = wrappedStream.read(buf, off, len); + if (result > 0) { + pos += result; + } + if (stats != null & result > 0) { + stats.incrementBytesRead(result); + } + + return result; + } + + public synchronized void close() throws IOException { + if (closed) { + return; + } + super.close(); + closed = true; + if (!channel.isConnected()) { + throw new IOException(E_CLIENT_NOTCONNECTED); + } + + try { + Session session = channel.getSession(); + channel.disconnect(); + session.disconnect(); + } catch (JSchException e) { + throw new IOException(StringUtils.stringifyException(e)); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/package-info.java new file mode 100644 index 00000000000..1427e48a77c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** SFTP FileSystem package. */ +package org.apache.hadoop.fs.sftp; \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java new file mode 100644 index 00000000000..06d9bf04257 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.sftp; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; + +import org.apache.sshd.SshServer; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.PasswordAuthenticator; +import org.apache.sshd.server.UserAuth; +import org.apache.sshd.server.auth.UserAuthPassword; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.session.ServerSession; +import org.apache.sshd.server.sftp.SftpSubsystem; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; + +public class TestSFTPFileSystem { + + private static final String TEST_SFTP_DIR = "testsftp"; + private static final String TEST_ROOT_DIR = + System.getProperty("test.build.data", "build/test/data"); + + @Rule public TestName name = new TestName(); + + private static final String connection = "sftp://user:password@localhost"; + private static Path localDir = null; + private static FileSystem localFs = null; + private static FileSystem sftpFs = null; + private static SshServer sshd = null; + private static int port; + + private static void startSshdServer() throws IOException { + sshd = SshServer.setUpDefaultServer(); + // ask OS to assign a port + sshd.setPort(0); + sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); + + List> userAuthFactories = + new ArrayList>(); + userAuthFactories.add(new UserAuthPassword.Factory()); + + sshd.setUserAuthFactories(userAuthFactories); + + sshd.setPasswordAuthenticator(new PasswordAuthenticator() { + @Override + public boolean authenticate(String username, String password, + ServerSession session) { + if (username.equals("user") && password.equals("password")) { + return true; + } + return false; + } + }); + + sshd.setSubsystemFactories( + Arrays.>asList(new SftpSubsystem.Factory())); + + sshd.start(); + port = sshd.getPort(); + } + + @BeforeClass + public static void setUp() throws Exception { + // skip all tests if running on Windows + assumeTrue(!Shell.WINDOWS); + + startSshdServer(); + + Configuration conf = new Configuration(); + conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class); + conf.setInt("fs.sftp.host.port", port); + conf.setBoolean("fs.sftp.impl.disable.cache", true); + + localFs = FileSystem.getLocal(conf); + localDir = localFs.makeQualified(new Path(TEST_ROOT_DIR, TEST_SFTP_DIR)); + if (localFs.exists(localDir)) { + localFs.delete(localDir, true); + } + localFs.mkdirs(localDir); + + sftpFs = FileSystem.get(URI.create(connection), conf); + } + + @AfterClass + public static void tearDown() { + if (localFs != null) { + try { + localFs.delete(localDir, true); + localFs.close(); + } catch (IOException e) { + // ignore + } + } + if (sftpFs != null) { + try { + sftpFs.close(); + } catch (IOException e) { + // ignore + } + } + if (sshd != null) { + try { + sshd.stop(true); + } catch (InterruptedException e) { + // ignore + } + } + } + + private static final Path touch(FileSystem fs, String filename) + throws IOException { + return touch(fs, filename, null); + } + + private static final Path touch(FileSystem fs, String filename, byte[] data) + throws IOException { + Path lPath = new Path(localDir.toUri().getPath(), filename); + FSDataOutputStream out = null; + try { + out = fs.create(lPath); + if (data != null) { + out.write(data); + } + } finally { + if (out != null) { + out.close(); + } + } + return lPath; + } + + /** + * Creates a file and deletes it. + * + * @throws Exception + */ + @Test + public void testCreateFile() throws Exception { + Path file = touch(sftpFs, name.getMethodName().toLowerCase()); + assertTrue(localFs.exists(file)); + assertTrue(sftpFs.delete(file, false)); + assertFalse(localFs.exists(file)); + } + + /** + * Checks if a new created file exists. + * + * @throws Exception + */ + @Test + public void testFileExists() throws Exception { + Path file = touch(localFs, name.getMethodName().toLowerCase()); + assertTrue(sftpFs.exists(file)); + assertTrue(localFs.exists(file)); + assertTrue(sftpFs.delete(file, false)); + assertFalse(sftpFs.exists(file)); + assertFalse(localFs.exists(file)); + } + + /** + * Test writing to a file and reading its value. + * + * @throws Exception + */ + @Test + public void testReadFile() throws Exception { + byte[] data = "yaks".getBytes(); + Path file = touch(localFs, name.getMethodName().toLowerCase(), data); + FSDataInputStream is = null; + try { + is = sftpFs.open(file); + byte[] b = new byte[data.length]; + is.read(b); + assertArrayEquals(data, b); + } finally { + if (is != null) { + is.close(); + } + } + assertTrue(sftpFs.delete(file, false)); + } + + /** + * Test getting the status of a file. + * + * @throws Exception + */ + @Test + public void testStatFile() throws Exception { + byte[] data = "yaks".getBytes(); + Path file = touch(localFs, name.getMethodName().toLowerCase(), data); + + FileStatus lstat = localFs.getFileStatus(file); + FileStatus sstat = sftpFs.getFileStatus(file); + assertNotNull(sstat); + + assertEquals(lstat.getPath().toUri().getPath(), + sstat.getPath().toUri().getPath()); + assertEquals(data.length, sstat.getLen()); + assertEquals(lstat.getLen(), sstat.getLen()); + assertTrue(sftpFs.delete(file, false)); + } + + /** + * Test deleting a non empty directory. + * + * @throws Exception + */ + @Test(expected=java.io.IOException.class) + public void testDeleteNonEmptyDir() throws Exception { + Path file = touch(localFs, name.getMethodName().toLowerCase()); + sftpFs.delete(localDir, false); + } + + /** + * Test deleting a file that does not exist. + * + * @throws Exception + */ + @Test + public void testDeleteNonExistFile() throws Exception { + Path file = new Path(localDir, name.getMethodName().toLowerCase()); + assertFalse(sftpFs.delete(file, false)); + } + + /** + * Test renaming a file. + * + * @throws Exception + */ + @Test + public void testRenameFile() throws Exception { + byte[] data = "dingos".getBytes(); + Path file1 = touch(localFs, name.getMethodName().toLowerCase() + "1"); + Path file2 = new Path(localDir, name.getMethodName().toLowerCase() + "2"); + + assertTrue(sftpFs.rename(file1, file2)); + + assertTrue(sftpFs.exists(file2)); + assertFalse(sftpFs.exists(file1)); + + assertTrue(localFs.exists(file2)); + assertFalse(localFs.exists(file1)); + + assertTrue(sftpFs.delete(file2, false)); + } + + /** + * Test renaming a file that does not exist. + * + * @throws Exception + */ + @Test(expected=java.io.IOException.class) + public void testRenameNonExistFile() throws Exception { + Path file1 = new Path(localDir, name.getMethodName().toLowerCase() + "1"); + Path file2 = new Path(localDir, name.getMethodName().toLowerCase() + "2"); + sftpFs.rename(file1, file2); + } + + /** + * Test renaming a file onto an existing file. + * + * @throws Exception + */ + @Test(expected=java.io.IOException.class) + public void testRenamingFileOntoExistingFile() throws Exception { + Path file1 = touch(localFs, name.getMethodName().toLowerCase() + "1"); + Path file2 = touch(localFs, name.getMethodName().toLowerCase() + "2"); + sftpFs.rename(file1, file2); + } + +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 16d20580a65..e010de1e4f8 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -665,6 +665,11 @@ mina-core 2.0.0-M5 + + org.apache.sshd + sshd-core + 0.14.0 + org.apache.ftpserver ftplet-api