HDFS-6100. DataNodeWebHdfsMethods does not failover in HA mode. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579301 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-03-19 17:29:23 +00:00
parent fd1c424548
commit 7817245d88
9 changed files with 185 additions and 193 deletions

View File

@ -649,6 +649,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth) HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth)
HDFS-6100. DataNodeWebHdfsMethods does not failover in HA mode. (Haohui Mai
via jing9)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -19,13 +19,13 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.EnumSet; import java.util.EnumSet;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue; import javax.ws.rs.DefaultValue;
@ -40,6 +40,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -48,12 +49,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.ParamFilter; import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
@ -61,7 +64,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.hdfs.web.resources.Param;
@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -86,18 +90,19 @@ public class DatanodeWebHdfsMethods {
private static final UriFsPathParam ROOT = new UriFsPathParam(""); private static final UriFsPathParam ROOT = new UriFsPathParam("");
private @Context ServletContext context; private @Context ServletContext context;
private @Context HttpServletRequest request;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
private void init(final UserGroupInformation ugi, private void init(final UserGroupInformation ugi,
final DelegationParam delegation, final InetSocketAddress nnRpcAddr, final DelegationParam delegation, final String nnId,
final UriFsPathParam path, final HttpOpParam<?> op, final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException { final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + Param.toSortedString(", ", parameters)); + ", ugi=" + ugi + Param.toSortedString(", ", parameters));
} }
if (nnRpcAddr == null) { if (nnId == null) {
throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME throw new IllegalArgumentException(NamenodeAddressParam.NAME
+ " is not specified."); + " is not specified.");
} }
@ -106,15 +111,32 @@ public class DatanodeWebHdfsMethods {
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC. //add a token for RPC.
final Token<DelegationTokenIdentifier> token = final Token<DelegationTokenIdentifier> token = deserializeToken
new Token<DelegationTokenIdentifier>(); (delegation.getValue(), nnId);
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
ugi.addToken(token); ugi.addToken(token);
} }
} }
@VisibleForTesting
Token<DelegationTokenIdentifier> deserializeToken
(String delegation,String nnId) throws IOException {
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf = datanode.getConf();
final Token<DelegationTokenIdentifier> token = new
Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation);
URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME +
"://" + nnId);
boolean isHA = HAUtil.isLogicalUri(conf, nnUri);
if (isHA) {
token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri));
} else {
token.setService(SecurityUtil.buildTokenService(nnUri));
}
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
return token;
}
/** Handle HTTP PUT request for the root. */ /** Handle HTTP PUT request for the root. */
@PUT @PUT
@Path("/") @Path("/")
@ -125,9 +147,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op, final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@ -141,7 +163,7 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize final BlockSizeParam blockSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission, return put(in, ugi, delegation, namenode, ROOT, op, permission,
overwrite, bufferSize, replication, blockSize); overwrite, bufferSize, replication, blockSize);
} }
@ -155,9 +177,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op, final PutOpParam op,
@ -173,14 +195,14 @@ public class DatanodeWebHdfsMethods {
final BlockSizeParam blockSize final BlockSizeParam blockSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); final String nnId = namenode.getValue();
init(ugi, delegation, nnRpcAddr, path, op, permission, init(ugi, delegation, nnId, path, op, permission,
overwrite, bufferSize, replication, blockSize); overwrite, bufferSize, replication, blockSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException, URISyntaxException { public Response run() throws IOException, URISyntaxException {
return put(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, return put(in, nnId, path.getAbsolutePath(), op,
permission, overwrite, bufferSize, replication, blockSize); permission, overwrite, bufferSize, replication, blockSize);
} }
}); });
@ -188,9 +210,7 @@ public class DatanodeWebHdfsMethods {
private Response put( private Response put(
final InputStream in, final InputStream in,
final UserGroupInformation ugi, final String nnId,
final DelegationParam delegation,
final InetSocketAddress nnRpcAddr,
final String fullpath, final String fullpath,
final PutOpParam op, final PutOpParam op,
final PermissionParam permission, final PermissionParam permission,
@ -208,7 +228,7 @@ public class DatanodeWebHdfsMethods {
conf.set(FsPermission.UMASK_LABEL, "000"); conf.set(FsPermission.UMASK_LABEL, "000");
final int b = bufferSize.getValue(conf); final int b = bufferSize.getValue(conf);
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); DFSClient dfsclient = newDfsClient(nnId, conf);
FSDataOutputStream out = null; FSDataOutputStream out = null;
try { try {
out = new FSDataOutputStream(dfsclient.create( out = new FSDataOutputStream(dfsclient.create(
@ -225,9 +245,10 @@ public class DatanodeWebHdfsMethods {
IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, out);
IOUtils.cleanup(LOG, dfsclient); IOUtils.cleanup(LOG, dfsclient);
} }
final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf); final String scheme = "http".equals(request.getScheme()) ?
final URI uri = new URI(WebHdfsFileSystem.SCHEME, null, WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME;
nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null); final URI uri = URI.create(String.format("%s://%s/%s", scheme,
nnId, fullpath));
return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
} }
default: default:
@ -245,15 +266,15 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op, final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize); return post(in, ugi, delegation, namenode, ROOT, op, bufferSize);
} }
/** Handle HTTP POST request. */ /** Handle HTTP POST request. */
@ -266,9 +287,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op, final PostOpParam op,
@ -276,13 +297,13 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); final String nnId = namenode.getValue();
init(ugi, delegation, nnRpcAddr, path, op, bufferSize); init(ugi, delegation, nnId, path, op, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException { public Response run() throws IOException {
return post(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, return post(in, nnId, path.getAbsolutePath(), op,
bufferSize); bufferSize);
} }
}); });
@ -290,9 +311,7 @@ public class DatanodeWebHdfsMethods {
private Response post( private Response post(
final InputStream in, final InputStream in,
final UserGroupInformation ugi, final String nnId,
final DelegationParam delegation,
final InetSocketAddress nnRpcAddr,
final String fullpath, final String fullpath,
final PostOpParam op, final PostOpParam op,
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
@ -304,7 +323,7 @@ public class DatanodeWebHdfsMethods {
{ {
final Configuration conf = new Configuration(datanode.getConf()); final Configuration conf = new Configuration(datanode.getConf());
final int b = bufferSize.getValue(conf); final int b = bufferSize.getValue(conf);
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); DFSClient dfsclient = newDfsClient(nnId, conf);
FSDataOutputStream out = null; FSDataOutputStream out = null;
try { try {
out = dfsclient.append(fullpath, b, null, null); out = dfsclient.append(fullpath, b, null, null);
@ -332,9 +351,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op, final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@ -344,7 +363,7 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length, return get(ugi, delegation, namenode, ROOT, op, offset, length,
bufferSize); bufferSize);
} }
@ -356,9 +375,9 @@ public class DatanodeWebHdfsMethods {
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation, final DelegationParam delegation,
@QueryParam(NamenodeRpcAddressParam.NAME) @QueryParam(NamenodeAddressParam.NAME)
@DefaultValue(NamenodeRpcAddressParam.DEFAULT) @DefaultValue(NamenodeAddressParam.DEFAULT)
final NamenodeRpcAddressParam namenodeRpcAddress, final NamenodeAddressParam namenode,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op, final GetOpParam op,
@ -370,22 +389,20 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); final String nnId = namenode.getValue();
init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize); init(ugi, delegation, nnId, path, op, offset, length, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException { public Response run() throws IOException {
return get(ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, return get(nnId, path.getAbsolutePath(), op, offset,
offset, length, bufferSize); length, bufferSize);
} }
}); });
} }
private Response get( private Response get(
final UserGroupInformation ugi, final String nnId,
final DelegationParam delegation,
final InetSocketAddress nnRpcAddr,
final String fullpath, final String fullpath,
final GetOpParam op, final GetOpParam op,
final OffsetParam offset, final OffsetParam offset,
@ -399,7 +416,7 @@ public class DatanodeWebHdfsMethods {
case OPEN: case OPEN:
{ {
final int b = bufferSize.getValue(conf); final int b = bufferSize.getValue(conf);
final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); final DFSClient dfsclient = newDfsClient(nnId, conf);
HdfsDataInputStream in = null; HdfsDataInputStream in = null;
try { try {
in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true)); in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true));
@ -426,7 +443,7 @@ public class DatanodeWebHdfsMethods {
case GETFILECHECKSUM: case GETFILECHECKSUM:
{ {
MD5MD5CRC32FileChecksum checksum = null; MD5MD5CRC32FileChecksum checksum = null;
DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); DFSClient dfsclient = newDfsClient(nnId, conf);
try { try {
checksum = dfsclient.getFileChecksum(fullpath); checksum = dfsclient.getFileChecksum(fullpath);
dfsclient.close(); dfsclient.close();
@ -441,4 +458,10 @@ public class DatanodeWebHdfsMethods {
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }
} }
private static DFSClient newDfsClient(String nnId,
Configuration conf) throws IOException {
URI uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId);
return new DFSClient(uri, conf);
}
} }

View File

@ -270,6 +270,11 @@ public class NameNode implements NameNodeStatusMXBean {
private JvmPauseMonitor pauseMonitor; private JvmPauseMonitor pauseMonitor;
private ObjectName nameNodeStatusBeanName; private ObjectName nameNodeStatusBeanName;
/**
* The service name of the delegation token issued by the namenode. It is
* the name service id in HA mode, or the rpc address in non-HA mode.
*/
private String tokenServiceName;
/** Format a new filesystem. Destroys any filesystem that may already /** Format a new filesystem. Destroys any filesystem that may already
* exist at this location. **/ * exist at this location. **/
@ -307,6 +312,13 @@ public class NameNode implements NameNodeStatusMXBean {
return startupProgress; return startupProgress;
} }
/**
* Return the service name of the issued delegation token.
*
* @return The name service id in HA-mode, or the rpc address in non-HA mode
*/
public String getTokenServiceName() { return tokenServiceName; }
public static InetSocketAddress getAddress(String address) { public static InetSocketAddress getAddress(String address) {
return NetUtils.createSocketAddr(address, DEFAULT_PORT); return NetUtils.createSocketAddr(address, DEFAULT_PORT);
} }
@ -500,6 +512,9 @@ public class NameNode implements NameNodeStatusMXBean {
loadNamesystem(conf); loadNamesystem(conf);
rpcServer = createRpcServer(conf); rpcServer = createRpcServer(conf);
final String nsId = getNameServiceId(conf);
tokenServiceName = HAUtil.isHAEnabled(conf, nsId) ? nsId : NetUtils
.getHostPortString(rpcServer.getRpcAddress());
if (NamenodeRole.NAMENODE == role) { if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage()); httpServer.setFSImage(getFSImage());

View File

@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam;
@ -275,7 +275,7 @@ public class NamenodeWebHdfsMethods {
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
} }
final String query = op.toQueryString() + delegationQuery final String query = op.toQueryString() + delegationQuery
+ "&" + new NamenodeRpcAddressParam(namenode) + "&" + new NamenodeAddressParam(namenode)
+ Param.toSortedString("&", parameters); + Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;

View File

@ -1,93 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
import java.net.InetSocketAddress;
/** InetSocketAddressParam parameter. */
abstract class InetSocketAddressParam
extends Param<InetSocketAddress, InetSocketAddressParam.Domain> {
InetSocketAddressParam(final Domain domain, final InetSocketAddress value) {
super(domain, value);
}
@Override
public String toString() {
return getName() + "=" + Domain.toString(getValue());
}
/** @return the parameter value as a string */
@Override
public String getValueString() {
return Domain.toString(getValue());
}
/** The domain of the parameter. */
static final class Domain extends Param.Domain<InetSocketAddress> {
Domain(final String paramName) {
super(paramName);
}
@Override
public String getDomain() {
return "<HOST:PORT>";
}
@Override
InetSocketAddress parse(final String str) {
if (str == null) {
throw new IllegalArgumentException("The input string is null: expect "
+ getDomain());
}
final int i = str.indexOf(':');
if (i < 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' character not found.");
} else if (i == 0) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": HOST is empty.");
} else if (i == str.length() - 1) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": PORT is empty.");
}
final String host = str.substring(0, i);
final int port;
try {
port = Integer.parseInt(str.substring(i + 1));
} catch(NumberFormatException e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\" as " + getDomain() + ": the ':' position is " + i
+ " but failed to parse PORT.", e);
}
try {
return new InetSocketAddress(host, port);
} catch(Exception e) {
throw new IllegalArgumentException("Failed to parse \"" + str
+ "\": cannot create InetSocketAddress(host=" + host
+ ", port=" + port + ")", e);
}
}
/** Convert an InetSocketAddress to a HOST:PORT String. */
static String toString(final InetSocketAddress addr) {
return addr.getHostName() + ":" + addr.getPort();
}
}
}

View File

@ -20,27 +20,27 @@ package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
/** Namenode RPC address parameter. */ /** Namenode RPC address parameter. */
public class NamenodeRpcAddressParam extends InetSocketAddressParam { public class NamenodeAddressParam extends StringParam {
/** Parameter name. */ /** Parameter name. */
public static final String NAME = "namenoderpcaddress"; public static final String NAME = "namenoderpcaddress";
/** Default parameter value. */ /** Default parameter value. */
public static final String DEFAULT = ""; public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME); private static final Domain DOMAIN = new Domain(NAME, null);
/** /**
* Constructor. * Constructor.
* @param str a string representation of the parameter value. * @param str a string representation of the parameter value.
*/ */
public NamenodeRpcAddressParam(final String str) { public NamenodeAddressParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str)); super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
} }
/** /**
* Construct an object using the RPC address of the given namenode. * Construct an object using the RPC address of the given namenode.
*/ */
public NamenodeRpcAddressParam(final NameNode namenode) { public NamenodeAddressParam(final NameNode namenode) {
super(DOMAIN, namenode.getNameNodeAddress()); super(DOMAIN, namenode.getTokenServiceName());
} }
@Override @Override

View File

@ -53,8 +53,11 @@ import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.web.TestWebHDFSForHA;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -137,6 +140,21 @@ public class DFSTestUtil {
NameNode.format(conf); NameNode.format(conf);
} }
/**
* Create a new HA-enabled configuration.
*/
public static Configuration newHAConfiguration(final String logicalName) {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
logicalName), "nn1,nn2");
conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" +
"." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
return conf;
}
/** class MyFile contains enough information to recreate the contents of /** class MyFile contains enough information to recreate the contents of
* a single file. * a single file.
*/ */

View File

@ -18,23 +18,25 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** Test whether WebHDFS can connect to an HA cluster */ import java.io.IOException;
import java.net.URI;
public class TestWebHDFSForHA { public class TestWebHDFSForHA {
private static final String LOGICAL_NAME = "minidfs"; private static final String LOGICAL_NAME = "minidfs";
private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME +
"://" + LOGICAL_NAME);
private static final MiniDFSNNTopology topo = new MiniDFSNNTopology() private static final MiniDFSNNTopology topo = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN( .addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
new MiniDFSNNTopology.NNConf("nn1")).addNN( new MiniDFSNNTopology.NNConf("nn1")).addNN(
@ -42,8 +44,7 @@ public class TestWebHDFSForHA {
@Test @Test
public void testHA() throws IOException { public void testHA() throws IOException {
Configuration conf = new Configuration(); Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fs = null; FileSystem fs = null;
try { try {
@ -54,8 +55,7 @@ public class TestWebHDFSForHA {
cluster.waitActive(); cluster.waitActive();
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; fs = FileSystem.get(WEBHDFS_URI, conf);
fs = FileSystem.get(URI.create(uri), conf);
cluster.transitionToActive(0); cluster.transitionToActive(0);
final Path dir = new Path("/test"); final Path dir = new Path("/test");
@ -67,9 +67,7 @@ public class TestWebHDFSForHA {
final Path dir2 = new Path("/test2"); final Path dir2 = new Path("/test2");
Assert.assertTrue(fs.mkdirs(dir2)); Assert.assertTrue(fs.mkdirs(dir2));
} finally { } finally {
if (fs != null) { IOUtils.cleanup(null, fs);
fs.close();
}
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
@ -78,10 +76,9 @@ public class TestWebHDFSForHA {
@Test @Test
public void testSecureHA() throws IOException { public void testSecureHA() throws IOException {
Configuration conf = new Configuration(); Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true); true);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null; WebHdfsFileSystem fs = null;
@ -92,8 +89,7 @@ public class TestWebHDFSForHA {
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive(); cluster.waitActive();
final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf);
fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf);
cluster.transitionToActive(0); cluster.transitionToActive(0);
Token<?> token = fs.getDelegationToken(null); Token<?> token = fs.getDelegationToken(null);
@ -104,9 +100,44 @@ public class TestWebHDFSForHA {
fs.renewDelegationToken(token); fs.renewDelegationToken(token);
fs.cancelDelegationToken(token); fs.cancelDelegationToken(token);
} finally { } finally {
if (fs != null) { IOUtils.cleanup(null, fs);
fs.close(); if (cluster != null) {
cluster.shutdown();
} }
}
}
@Test
public void testFailoverAfterOpen() throws IOException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
MiniDFSCluster cluster = null;
FileSystem fs = null;
final Path p = new Path("/test");
final byte[] data = "Hello".getBytes();
try {
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
.numDataNodes(1).build();
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive();
fs = FileSystem.get(WEBHDFS_URI, conf);
cluster.transitionToActive(1);
FSDataOutputStream out = fs.create(p);
cluster.shutdownNameNode(1);
cluster.transitionToActive(0);
out.write(data);
out.close();
FSDataInputStream in = fs.open(p);
byte[] buf = new byte[data.length];
IOUtils.readFully(in, buf, 0, buf.length);
Assert.assertArrayEquals(data, buf);
} finally {
IOUtils.cleanup(null, fs);
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -43,13 +43,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.resources.DoAsParam; import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -465,7 +460,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
AppendTestUtil.testAppend(fs, new Path(dir, "append")); AppendTestUtil.testAppend(fs, new Path(dir, "append"));
} }
{//test NamenodeRpcAddressParam not set. {//test NamenodeAddressParam not set.
final HttpOpParam.Op op = PutOpParam.Op.CREATE; final HttpOpParam.Op op = PutOpParam.Op.CREATE;
final URL url = webhdfs.toUrl(op, dir); final URL url = webhdfs.toUrl(op, dir);
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@ -476,9 +471,9 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
final String redirect = conn.getHeaderField("Location"); final String redirect = conn.getHeaderField("Location");
conn.disconnect(); conn.disconnect();
//remove NamenodeRpcAddressParam //remove NamenodeAddressParam
WebHdfsFileSystem.LOG.info("redirect = " + redirect); WebHdfsFileSystem.LOG.info("redirect = " + redirect);
final int i = redirect.indexOf(NamenodeRpcAddressParam.NAME); final int i = redirect.indexOf(NamenodeAddressParam.NAME);
final int j = redirect.indexOf("&", i); final int j = redirect.indexOf("&", i);
String modified = redirect.substring(0, i - 1) + redirect.substring(j); String modified = redirect.substring(0, i - 1) + redirect.substring(j);
WebHdfsFileSystem.LOG.info("modified = " + modified); WebHdfsFileSystem.LOG.info("modified = " + modified);