HDFS-5436. Merging change r1536921 from trunk to branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1536922 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-29 22:50:24 +00:00
parent 330ada83eb
commit 57bdb564c7
21 changed files with 139 additions and 118 deletions

View File

@ -93,6 +93,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection
creation to URLConnectionFactory. (Haohui Mai via jing9) creation to URLConnectionFactory. (Haohui Mai via jing9)
HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
(Haohui Mai via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
/** Redirect queries about the hosted filesystem to an appropriate datanode. /** Redirect queries about the hosted filesystem to an appropriate datanode.
* @see org.apache.hadoop.hdfs.HftpFileSystem * @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class FileDataServlet extends DfsServlet { public class FileDataServlet extends DfsServlet {

View File

@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.znerd.xmlenc.*; import org.znerd.xmlenc.*;
import java.io.IOException; import java.io.IOException;
@ -39,13 +38,14 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Stack; import java.util.Stack;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
/** /**
* Obtain meta-information about a filesystem. * Obtain meta-information about a filesystem.
* @see org.apache.hadoop.hdfs.HftpFileSystem * @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ListPathsServlet extends DfsServlet { public class ListPathsServlet extends DfsServlet {

View File

@ -41,12 +41,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -34,28 +34,28 @@ import com.google.common.net.HttpHeaders;
/** /**
* To support HTTP byte streams, a new connection to an HTTP server needs to be * To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple * created each time. This class hides the complexity of those multiple
* connections from the client. Whenever seek() is called, a new connection * connections from the client. Whenever seek() is called, a new connection
* is made on the successive read(). The normal input stream functions are * is made on the successive read(). The normal input stream functions are
* connected to the currently active input stream. * connected to the currently active input stream.
*/ */
public abstract class ByteRangeInputStream extends FSInputStream { public abstract class ByteRangeInputStream extends FSInputStream {
/** /**
* This class wraps a URL and provides method to open connection. * This class wraps a URL and provides method to open connection.
* It can be overridden to change how a connection is opened. * It can be overridden to change how a connection is opened.
*/ */
public static abstract class URLOpener { public static abstract class URLOpener {
protected URL url; protected URL url;
public URLOpener(URL u) { public URLOpener(URL u) {
url = u; url = u;
} }
public void setURL(URL u) { public void setURL(URL u) {
url = u; url = u;
} }
public URL getURL() { public URL getURL() {
return url; return url;
} }
@ -78,7 +78,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
StreamStatus status = StreamStatus.SEEK; StreamStatus status = StreamStatus.SEEK;
/** /**
* Create with the specified URLOpeners. Original url is used to open the * Create with the specified URLOpeners. Original url is used to open the
* stream for the first time. Resolved url is used in subsequent requests. * stream for the first time. Resolved url is used in subsequent requests.
* @param o Original url * @param o Original url
* @param r Resolved url * @param r Resolved url
@ -87,7 +87,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
this.originalURL = o; this.originalURL = o;
this.resolvedURL = r; this.resolvedURL = r;
} }
protected abstract URL getResolvedUrl(final HttpURLConnection connection protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException; ) throws IOException;
@ -108,12 +108,12 @@ public abstract class ByteRangeInputStream extends FSInputStream {
} }
return in; return in;
} }
@VisibleForTesting @VisibleForTesting
protected InputStream openInputStream() throws IOException { protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if // Use the original url if no resolved url exists, eg. if
// it's the first time a request is made. // it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null; final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL; final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.connect(startPos, resolved); final HttpURLConnection connection = opener.connect(startPos, resolved);
@ -141,7 +141,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
return in; return in;
} }
private static boolean isChunkedTransferEncoding( private static boolean isChunkedTransferEncoding(
final Map<String, List<String>> headers) { final Map<String, List<String>> headers) {
return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked") return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
@ -186,7 +186,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
public int read(byte b[], int off, int len) throws IOException { public int read(byte b[], int off, int len) throws IOException {
return update(getInputStream().read(b, off, len)); return update(getInputStream().read(b, off, len));
} }
/** /**
* Seek to the given offset from the start of the file. * Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't * The next read() will be from that location. Can't
@ -219,7 +219,7 @@ public abstract class ByteRangeInputStream extends FSInputStream {
public boolean seekToNewSource(long targetPos) throws IOException { public boolean seekToNewSource(long targetPos) throws IOException {
return false; return false;
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (in != null) { if (in != null) {

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -47,11 +47,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -161,7 +163,7 @@ public class HftpFileSystem extends FileSystem
public String getCanonicalServiceName() { public String getCanonicalServiceName() {
return SecurityUtil.buildTokenService(nnUri).toString(); return SecurityUtil.buildTokenService(nnUri).toString();
} }
@Override @Override
protected URI canonicalizeUri(URI uri) { protected URI canonicalizeUri(URI uri) {
return NetUtils.getCanonicalUri(uri, getDefaultPort()); return NetUtils.getCanonicalUri(uri, getDefaultPort());
@ -183,7 +185,7 @@ public class HftpFileSystem extends FileSystem
throws IOException { throws IOException {
super.initialize(name, conf); super.initialize(name, conf);
setConf(conf); setConf(conf);
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.nnUri = getNamenodeUri(name); this.nnUri = getNamenodeUri(name);
try { try {
this.hftpURI = new URI(name.getScheme(), name.getAuthority(), this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
@ -224,7 +226,7 @@ public class HftpFileSystem extends FileSystem
UserGroupInformation ugi) { UserGroupInformation ugi) {
return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf()); return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
} }
@Override @Override
public Token<?> getRenewToken() { public Token<?> getRenewToken() {
@ -315,7 +317,7 @@ public class HftpFileSystem extends FileSystem
/** /**
* Get encoded UGI parameter string for a URL. * Get encoded UGI parameter string for a URL.
* *
* @return user_shortname,group1,group2... * @return user_shortname,group1,group2...
*/ */
private String getEncodedUgiParameter() { private String getEncodedUgiParameter() {
@ -359,7 +361,7 @@ public class HftpFileSystem extends FileSystem
static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener { static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
RangeHeaderUrlOpener(final URL url) { RangeHeaderUrlOpener(final URL url) {
super(url); super(url);
} }
@ -379,7 +381,7 @@ public class HftpFileSystem extends FileSystem
} }
conn.connect(); conn.connect();
//Expects HTTP_OK or HTTP_PARTIAL response codes. //Expects HTTP_OK or HTTP_PARTIAL response codes.
final int code = conn.getResponseCode(); final int code = conn.getResponseCode();
if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) { if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
throw new IOException("HTTP_PARTIAL expected, received " + code); throw new IOException("HTTP_PARTIAL expected, received " + code);
@ -387,7 +389,7 @@ public class HftpFileSystem extends FileSystem
throw new IOException("HTTP_OK expected, received " + code); throw new IOException("HTTP_OK expected, received " + code);
} }
return conn; return conn;
} }
} }
static class RangeHeaderInputStream extends ByteRangeInputStream { static class RangeHeaderInputStream extends ByteRangeInputStream {
@ -410,7 +412,7 @@ public class HftpFileSystem extends FileSystem
f = f.makeQualified(getUri(), getWorkingDirectory()); f = f.makeQualified(getUri(), getWorkingDirectory());
String path = "/data" + ServletUtil.encodePath(f.toUri().getPath()); String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter()); String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
URL u = getNamenodeURL(path, query); URL u = getNamenodeURL(path, query);
return new FSDataInputStream(new RangeHeaderInputStream(u)); return new FSDataInputStream(new RangeHeaderInputStream(u));
} }
@ -533,7 +535,7 @@ public class HftpFileSystem extends FileSystem
private FileChecksum getFileChecksum(String f) throws IOException { private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection( final HttpURLConnection connection = openConnection(
"/fileChecksum" + ServletUtil.encodePath(f), "/fileChecksum" + ServletUtil.encodePath(f),
"ugi=" + getEncodedUgiParameter()); "ugi=" + getEncodedUgiParameter());
try { try {
final XMLReader xr = XMLReaderFactory.createXMLReader(); final XMLReader xr = XMLReaderFactory.createXMLReader();
@ -585,11 +587,11 @@ public class HftpFileSystem extends FileSystem
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
@Override @Override
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
} }
@Override @Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException { public boolean mkdirs(Path f, FsPermission permission) throws IOException {
throw new IOException("Not supported"); throw new IOException("Not supported");
@ -615,18 +617,18 @@ public class HftpFileSystem extends FileSystem
} }
/** /**
* Connect to the name node and get content summary. * Connect to the name node and get content summary.
* @param path The path * @param path The path
* @return The content summary for the path. * @return The content summary for the path.
* @throws IOException * @throws IOException
*/ */
private ContentSummary getContentSummary(String path) throws IOException { private ContentSummary getContentSummary(String path) throws IOException {
final HttpURLConnection connection = openConnection( final HttpURLConnection connection = openConnection(
"/contentSummary" + ServletUtil.encodePath(path), "/contentSummary" + ServletUtil.encodePath(path),
"ugi=" + getEncodedUgiParameter()); "ugi=" + getEncodedUgiParameter());
InputStream in = null; InputStream in = null;
try { try {
in = connection.getInputStream(); in = connection.getInputStream();
final XMLReader xr = XMLReaderFactory.createXMLReader(); final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this); xr.setContentHandler(this);
@ -713,12 +715,12 @@ public class HftpFileSystem extends FileSystem
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public long renew(Token<?> token, public long renew(Token<?> token,
Configuration conf) throws IOException { Configuration conf) throws IOException {
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
return return
DelegationTokenFetcher.renewDelegationToken DelegationTokenFetcher.renewDelegationToken
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token); (Token<DelegationTokenIdentifier>) token);
@ -726,7 +728,7 @@ public class HftpFileSystem extends FileSystem
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void cancel(Token<?> token, public void cancel(Token<?> token,
Configuration conf) throws IOException { Configuration conf) throws IOException {
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
@ -734,9 +736,9 @@ public class HftpFileSystem extends FileSystem
DelegationTokenFetcher.cancelDelegationToken DelegationTokenFetcher.cancelDelegationToken
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token); (Token<DelegationTokenIdentifier>) token);
} }
} }
private static class HftpDelegationTokenSelector private static class HftpDelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> { extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
private static final DelegationTokenSelector hdfsTokenSelector = private static final DelegationTokenSelector hdfsTokenSelector =
@ -745,14 +747,14 @@ public class HftpFileSystem extends FileSystem
public HftpDelegationTokenSelector() { public HftpDelegationTokenSelector() {
super(TOKEN_KIND); super(TOKEN_KIND);
} }
Token<DelegationTokenIdentifier> selectToken(URI nnUri, Token<DelegationTokenIdentifier> selectToken(URI nnUri,
Collection<Token<?>> tokens, Configuration conf) { Collection<Token<?>> tokens, Configuration conf) {
Token<DelegationTokenIdentifier> token = Token<DelegationTokenIdentifier> token =
selectToken(SecurityUtil.buildTokenService(nnUri), tokens); selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
if (token == null) { if (token == null) {
// try to get a HDFS token // try to get a HDFS token
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf); token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
} }
return token; return token;
} }

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
@ -40,13 +40,15 @@ import javax.net.ssl.X509TrustManager;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
/** /**
* An implementation of a protocol for accessing filesystems over HTTPS. The * An implementation of a protocol for accessing filesystems over HTTPS. The
* following implementation provides a limited, read-only interface to a * following implementation provides a limited, read-only interface to a
* filesystem over HTTPS. * filesystem over HTTPS.
* *
* @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
* @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
*/ */
@ -85,7 +87,7 @@ public class HsftpFileSystem extends HftpFileSystem {
/** /**
* Set up SSL resources * Set up SSL resources
* *
* @throws IOException * @throws IOException
*/ */
private static void setupSsl(Configuration conf) throws IOException { private static void setupSsl(Configuration conf) throws IOException {

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;

View File

@ -14,6 +14,6 @@
# limitations under the License. # limitations under the License.
org.apache.hadoop.hdfs.DistributedFileSystem org.apache.hadoop.hdfs.DistributedFileSystem
org.apache.hadoop.hdfs.HftpFileSystem org.apache.hadoop.hdfs.web.HftpFileSystem
org.apache.hadoop.hdfs.HsftpFileSystem org.apache.hadoop.hdfs.web.HsftpFileSystem
org.apache.hadoop.hdfs.web.WebHdfsFileSystem org.apache.hadoop.hdfs.web.WebHdfsFileSystem

View File

@ -13,5 +13,5 @@
# #
org.apache.hadoop.hdfs.DFSClient$Renewer org.apache.hadoop.hdfs.DFSClient$Renewer
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
org.apache.hadoop.hdfs.HftpFileSystem$TokenManager org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level; import org.apache.log4j.Level;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet; import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;

View File

@ -39,8 +39,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -34,6 +34,7 @@ import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.junit.Test; import org.junit.Test;
public class TestByteRangeInputStream { public class TestByteRangeInputStream {
@ -41,24 +42,24 @@ public static class MockHttpURLConnection extends HttpURLConnection {
public MockHttpURLConnection(URL u) { public MockHttpURLConnection(URL u) {
super(u); super(u);
} }
@Override @Override
public boolean usingProxy(){ public boolean usingProxy(){
return false; return false;
} }
@Override @Override
public void disconnect() { public void disconnect() {
} }
@Override @Override
public void connect() { public void connect() {
} }
@Override @Override
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream("asdf".getBytes()); return new ByteArrayInputStream("asdf".getBytes());
} }
@Override @Override
public URL getURL() { public URL getURL() {
@ -70,7 +71,7 @@ public static class MockHttpURLConnection extends HttpURLConnection {
} }
return u; return u;
} }
@Override @Override
public int getResponseCode() { public int getResponseCode() {
if (responseCode != -1) { if (responseCode != -1) {
@ -87,13 +88,13 @@ public static class MockHttpURLConnection extends HttpURLConnection {
public void setResponseCode(int resCode) { public void setResponseCode(int resCode) {
responseCode = resCode; responseCode = resCode;
} }
@Override @Override
public String getHeaderField(String field) { public String getHeaderField(String field) {
return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null; return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
} }
} }
@Test @Test
public void testByteRange() throws IOException { public void testByteRange() throws IOException {
HftpFileSystem.RangeHeaderUrlOpener ospy = spy( HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
@ -149,7 +150,7 @@ public static class MockHttpURLConnection extends HttpURLConnection {
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200); ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
is.seek(500); is.seek(500);
try { try {
is.read(); is.read();
fail("Exception should be thrown when 200 response is given " fail("Exception should be thrown when 200 response is given "
@ -171,31 +172,31 @@ public static class MockHttpURLConnection extends HttpURLConnection {
"HTTP_OK expected, received 206", e.getMessage()); "HTTP_OK expected, received 206", e.getMessage());
} }
} }
@Test @Test
public void testPropagatedClose() throws IOException { public void testPropagatedClose() throws IOException {
ByteRangeInputStream brs = spy( ByteRangeInputStream brs = spy(
new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/"))); new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
InputStream mockStream = mock(InputStream.class); InputStream mockStream = mock(InputStream.class);
doReturn(mockStream).when(brs).openInputStream(); doReturn(mockStream).when(brs).openInputStream();
int brisOpens = 0; int brisOpens = 0;
int brisCloses = 0; int brisCloses = 0;
int isCloses = 0; int isCloses = 0;
// first open, shouldn't close underlying stream // first open, shouldn't close underlying stream
brs.getInputStream(); brs.getInputStream();
verify(brs, times(++brisOpens)).openInputStream(); verify(brs, times(++brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close(); verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream, times(isCloses)).close();
// stream is open, shouldn't close underlying stream // stream is open, shouldn't close underlying stream
brs.getInputStream(); brs.getInputStream();
verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisOpens)).openInputStream();
verify(brs, times(brisCloses)).close(); verify(brs, times(brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream, times(isCloses)).close();
// seek forces a reopen, should close underlying stream // seek forces a reopen, should close underlying stream
brs.seek(1); brs.seek(1);
brs.getInputStream(); brs.getInputStream();
@ -221,12 +222,12 @@ public static class MockHttpURLConnection extends HttpURLConnection {
brs.close(); brs.close();
verify(brs, times(++brisCloses)).close(); verify(brs, times(++brisCloses)).close();
verify(mockStream, times(++isCloses)).close(); verify(mockStream, times(++isCloses)).close();
// it's already closed, underlying stream should not close // it's already closed, underlying stream should not close
brs.close(); brs.close();
verify(brs, times(++brisCloses)).close(); verify(brs, times(++brisCloses)).close();
verify(mockStream, times(isCloses)).close(); verify(mockStream, times(isCloses)).close();
// it's closed, don't reopen it // it's closed, don't reopen it
boolean errored = false; boolean errored = false;
try { try {

View File

@ -16,10 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.ServerSocket; import java.net.ServerSocket;
@ -29,7 +30,10 @@ import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -46,11 +50,11 @@ public class TestHftpDelegationToken {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
UserGroupInformation user = UserGroupInformation user =
UserGroupInformation.createUserForTesting("oom", UserGroupInformation.createUserForTesting("oom",
new String[]{"memory"}); new String[]{"memory"});
Token<?> token = new Token<TokenIdentifier> Token<?> token = new Token<TokenIdentifier>
(new byte[0], new byte[0], (new byte[0], new byte[0],
DelegationTokenIdentifier.HDFS_DELEGATION_KIND, DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
new Text("127.0.0.1:8020")); new Text("127.0.0.1:8020"));
user.addToken(token); user.addToken(token);
@ -58,7 +62,7 @@ public class TestHftpDelegationToken {
(null, null, new Text("other token"), new Text("127.0.0.1:8021")); (null, null, new Text("other token"), new Text("127.0.0.1:8021"));
user.addToken(token2); user.addToken(token2);
assertEquals("wrong tokens in user", 2, user.getTokens().size()); assertEquals("wrong tokens in user", 2, user.getTokens().size());
FileSystem fs = FileSystem fs =
user.doAs(new PrivilegedExceptionAction<FileSystem>() { user.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override @Override
public FileSystem run() throws Exception { public FileSystem run() throws Exception {
@ -78,13 +82,13 @@ public class TestHftpDelegationToken {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class); conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
int httpPort = 80; int httpPort = 80;
int httpsPort = 443; int httpsPort = 443;
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
// test with implicit default port // test with implicit default port
URI fsUri = URI.create("hftp://localhost"); URI fsUri = URI.create("hftp://localhost");
MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort()); assertEquals(httpPort, fs.getCanonicalUri().getPort());
@ -96,14 +100,14 @@ public class TestHftpDelegationToken {
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort()); assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort, conf); checkTokenSelection(fs, httpPort, conf);
// test with non-default port // test with non-default port
// Make sure it uses the port from the hftp URI. // Make sure it uses the port from the hftp URI.
fsUri = URI.create("hftp://localhost:"+(httpPort+1)); fsUri = URI.create("hftp://localhost:"+(httpPort+1));
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort+1, fs.getCanonicalUri().getPort()); assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpPort + 1, conf); checkTokenSelection(fs, httpPort + 1, conf);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
} }
@ -119,27 +123,27 @@ public class TestHftpDelegationToken {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
// test with implicit default port // test with implicit default port
URI fsUri = URI.create("hsftp://localhost"); URI fsUri = URI.create("hsftp://localhost");
MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort, fs.getCanonicalUri().getPort()); assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf); checkTokenSelection(fs, httpsPort, conf);
// test with explicit default port // test with explicit default port
fsUri = URI.create("hsftp://localhost:"+httpsPort); fsUri = URI.create("hsftp://localhost:"+httpsPort);
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort, fs.getCanonicalUri().getPort()); assertEquals(httpsPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf); checkTokenSelection(fs, httpsPort, conf);
// test with non-default port // test with non-default port
fsUri = URI.create("hsftp://localhost:"+(httpsPort+1)); fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpsPort+1, fs.getCanonicalUri().getPort()); assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort+1, conf); checkTokenSelection(fs, httpsPort+1, conf);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
} }
@Test @Test
public void testInsecureRemoteCluster() throws Exception { public void testInsecureRemoteCluster() throws Exception {
@ -186,7 +190,7 @@ public class TestHftpDelegationToken {
t.interrupt(); t.interrupt();
} }
} }
private void checkTokenSelection(HftpFileSystem fs, private void checkTokenSelection(HftpFileSystem fs,
int port, int port,
Configuration conf) throws IOException { Configuration conf) throws IOException {
@ -216,12 +220,12 @@ public class TestHftpDelegationToken {
token = fs.selectDelegationToken(ugi); token = fs.selectDelegationToken(ugi);
assertNotNull(token); assertNotNull(token);
assertEquals(hftpToken, token); assertEquals(hftpToken, token);
// switch to using host-based tokens, no token should match // switch to using host-based tokens, no token should match
SecurityUtilTestHelper.setTokenServiceUseIp(false); SecurityUtilTestHelper.setTokenServiceUseIp(false);
token = fs.selectDelegationToken(ugi); token = fs.selectDelegationToken(ugi);
assertNull(token); assertNull(token);
// test fallback to hdfs token // test fallback to hdfs token
hdfsToken = new Token<TokenIdentifier>( hdfsToken = new Token<TokenIdentifier>(
new byte[0], new byte[0], new byte[0], new byte[0],
@ -241,7 +245,7 @@ public class TestHftpDelegationToken {
assertNotNull(token); assertNotNull(token);
assertEquals(hftpToken, token); assertEquals(hftpToken, token);
} }
static class MyHftpFileSystem extends HftpFileSystem { static class MyHftpFileSystem extends HftpFileSystem {
@Override @Override
public URI getCanonicalUri() { public URI getCanonicalUri() {
@ -255,7 +259,7 @@ public class TestHftpDelegationToken {
@Override @Override
protected void initDelegationToken() throws IOException {} protected void initDelegationToken() throws IOException {}
} }
static class MyHsftpFileSystem extends HsftpFileSystem { static class MyHsftpFileSystem extends HsftpFileSystem {
@Override @Override
public URI getCanonicalUri() { public URI getCanonicalUri() {

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -38,16 +38,21 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.*; import org.junit.*;
public class TestHftpFileSystem { public class TestHftpFileSystem {
private static final Random RAN = new Random(); private static final Random RAN = new Random();
private static Configuration config = null; private static Configuration config = null;
private static MiniDFSCluster cluster = null; private static MiniDFSCluster cluster = null;
private static String blockPoolId = null; private static String blockPoolId = null;
@ -94,17 +99,17 @@ public class TestHftpFileSystem {
config = new Configuration(); config = new Configuration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();
hftpUri = hftpUri =
"hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
} }
@AfterClass @AfterClass
public static void tearDown() throws IOException { public static void tearDown() throws IOException {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Before @Before
public void initFileSystems() throws IOException { public void initFileSystems() throws IOException {
hdfs = cluster.getFileSystem(); hdfs = cluster.getFileSystem();
@ -119,9 +124,9 @@ public class TestHftpFileSystem {
public void resetFileSystems() throws IOException { public void resetFileSystems() throws IOException {
FileSystem.closeAll(); FileSystem.closeAll();
} }
/** /**
* Test file creation and access with file names that need encoding. * Test file creation and access with file names that need encoding.
*/ */
@Test @Test
public void testFileNameEncoding() throws IOException, URISyntaxException { public void testFileNameEncoding() throws IOException, URISyntaxException {
@ -159,13 +164,13 @@ public class TestHftpFileSystem {
// Get the path's block location so we can determine // Get the path's block location so we can determine
// if we were redirected to the right DN. // if we were redirected to the right DN.
BlockLocation[] locations = BlockLocation[] locations =
hdfs.getFileBlockLocations(path, 0, 10); hdfs.getFileBlockLocations(path, 0, 10);
String xferAddr = locations[0].getNames()[0]; String xferAddr = locations[0].getNames()[0];
// Connect to the NN to get redirected // Connect to the NN to get redirected
URL u = hftpFs.getNamenodeURL( URL u = hftpFs.getNamenodeURL(
"/data" + ServletUtil.encodePath(path.toUri().getPath()), "/data" + ServletUtil.encodePath(path.toUri().getPath()),
"ugi=userx,groupy"); "ugi=userx,groupy");
HttpURLConnection conn = (HttpURLConnection)u.openConnection(); HttpURLConnection conn = (HttpURLConnection)u.openConnection();
HttpURLConnection.setFollowRedirects(true); HttpURLConnection.setFollowRedirects(true);
@ -176,7 +181,7 @@ public class TestHftpFileSystem {
// Find the datanode that has the block according to locations // Find the datanode that has the block according to locations
// and check that the URL was redirected to this DN's info port // and check that the URL was redirected to this DN's info port
for (DataNode node : cluster.getDataNodes()) { for (DataNode node : cluster.getDataNodes()) {
DatanodeRegistration dnR = DatanodeRegistration dnR =
DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId); DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId);
if (dnR.getXferAddr().equals(xferAddr)) { if (dnR.getXferAddr().equals(xferAddr)) {
checked = true; checked = true;
@ -207,25 +212,25 @@ public class TestHftpFileSystem {
FSDataOutputStream out = hdfs.create(testFile, true); FSDataOutputStream out = hdfs.create(testFile, true);
out.writeBytes("0123456789"); out.writeBytes("0123456789");
out.close(); out.close();
FSDataInputStream in = hftpFs.open(testFile); FSDataInputStream in = hftpFs.open(testFile);
// Test read(). // Test read().
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
assertEquals(i, in.getPos()); assertEquals(i, in.getPos());
in.read(); in.read();
} }
// Test read(b, off, len). // Test read(b, off, len).
assertEquals(5, in.getPos()); assertEquals(5, in.getPos());
byte[] buffer = new byte[10]; byte[] buffer = new byte[10];
assertEquals(2, in.read(buffer, 0, 2)); assertEquals(2, in.read(buffer, 0, 2));
assertEquals(7, in.getPos()); assertEquals(7, in.getPos());
// Test read(b). // Test read(b).
int bytesRead = in.read(buffer); int bytesRead = in.read(buffer);
assertEquals(7 + bytesRead, in.getPos()); assertEquals(7 + bytesRead, in.getPos());
// Test EOF. // Test EOF.
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
in.read(); in.read();
@ -261,21 +266,21 @@ public class TestHftpFileSystem {
in.close(); in.close();
checkClosedStream(in); checkClosedStream(in);
checkClosedStream(in.getWrappedStream()); checkClosedStream(in.getWrappedStream());
// force the stream to connect and then close it // force the stream to connect and then close it
in = hftpFs.open(testFile); in = hftpFs.open(testFile);
int ch = in.read(); int ch = in.read();
assertEquals('0', ch); assertEquals('0', ch);
in.close(); in.close();
checkClosedStream(in); checkClosedStream(in);
checkClosedStream(in.getWrappedStream()); checkClosedStream(in.getWrappedStream());
// make sure seeking doesn't automagically reopen the stream // make sure seeking doesn't automagically reopen the stream
in.seek(4); in.seek(4);
checkClosedStream(in); checkClosedStream(in);
checkClosedStream(in.getWrappedStream()); checkClosedStream(in.getWrappedStream());
} }
private void checkClosedStream(InputStream is) { private void checkClosedStream(InputStream is) {
IOException ioe = null; IOException ioe = null;
try { try {
@ -286,7 +291,7 @@ public class TestHftpFileSystem {
assertNotNull("No exception on closed read", ioe); assertNotNull("No exception on closed read", ioe);
assertEquals("Stream closed", ioe.getMessage()); assertEquals("Stream closed", ioe.getMessage());
} }
@Test @Test
public void testHftpDefaultPorts() throws IOException { public void testHftpDefaultPorts() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -304,7 +309,7 @@ public class TestHftpFileSystem {
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
); );
} }
@Test @Test
public void testHftpCustomDefaultPorts() throws IOException { public void testHftpCustomDefaultPorts() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -314,7 +319,7 @@ public class TestHftpFileSystem {
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort()); assertEquals(123, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
// HFTP uses http to get the token so canonical service name should // HFTP uses http to get the token so canonical service name should
@ -349,8 +354,8 @@ public class TestHftpFileSystem {
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort()); assertEquals(123, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
"127.0.0.1:789", "127.0.0.1:789",
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
@ -384,7 +389,7 @@ public class TestHftpFileSystem {
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(456, fs.getDefaultPort()); assertEquals(456, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
"127.0.0.1:456", "127.0.0.1:456",

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -33,6 +33,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.junit.Test; import org.junit.Test;
@ -83,7 +85,7 @@ public class TestHftpURLTimeouts {
HsftpFileSystem fs = (HsftpFileSystem)FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem)FileSystem.get(uri, conf);
fs.connectionFactory = new URLConnectionFactory(5); fs.connectionFactory = new URLConnectionFactory(5);
try { try {
HttpURLConnection conn = null; HttpURLConnection conn = null;
timedout = false; timedout = false;
@ -104,7 +106,7 @@ public class TestHftpURLTimeouts {
fs.close(); fs.close();
} }
} }
private boolean checkConnectTimeout(HftpFileSystem fs, boolean ignoreReadTimeout) private boolean checkConnectTimeout(HftpFileSystem fs, boolean ignoreReadTimeout)
throws IOException { throws IOException {
boolean timedout = false; boolean timedout = false;

View File

@ -34,9 +34,9 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;