svn merge -c 1198903 from trunk for HDFS-2528.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1198905 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-11-07 20:06:51 +00:00
parent 4657411161
commit 240c74b632
8 changed files with 72 additions and 61 deletions

View File

@ -29,6 +29,9 @@ Release 0.23.1 - UNRELEASED
isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in
HdfsFileStatus JSON object. (szetszwo)
HDFS-2528. Webhdfs: set delegation kind to WEBHDFS and add a HDFS token
when http requests are redirected to datanode. (szetszwo)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -134,7 +134,7 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
public DelegationTokenRenewer(final Class<?> clazz) {
public DelegationTokenRenewer(final Class<T> clazz) {
super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
setDaemon(true);
}

View File

@ -58,13 +58,12 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
@ -546,9 +545,8 @@ public class JspHelper {
token.decodeFromUrlString(tokenString);
String serviceAddress = getNNServiceAddress(context, request);
if (serviceAddress != null) {
LOG.info("Setting service in token: "
+ new Text(serviceAddress));
token.setService(new Text(serviceAddress));
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
}
ByteArrayInputStream buf = new ByteArrayInputStream(token
.getIdentifier());

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.JsonUtil;
@ -58,7 +59,9 @@ import org.apache.hadoop.hdfs.web.ParamFilter;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
@ -69,7 +72,9 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.sun.jersey.spi.container.ResourceFilters;
@ -84,6 +89,29 @@ public class DatanodeWebHdfsMethods {
private @Context ServletContext context;
private @Context HttpServletResponse response;
private void init(final UserGroupInformation ugi, final DelegationParam delegation,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + Param.toSortedString(", ", parameters));
}
//clear content type
response.setContentType(null);
if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC.
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
ugi.addToken(token);
}
}
/** Handle HTTP PUT request for the root. */
@PUT
@Path("/")
@ -92,6 +120,8 @@ public class DatanodeWebHdfsMethods {
public Response putRoot(
final InputStream in,
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@ -105,7 +135,7 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
return put(in, ugi, ROOT, op, permission, overwrite, bufferSize,
return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
replication, blockSize);
}
@ -117,6 +147,8 @@ public class DatanodeWebHdfsMethods {
public Response put(
final InputStream in,
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op,
@ -132,14 +164,8 @@ public class DatanodeWebHdfsMethods {
final BlockSizeParam blockSize
) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
+ Param.toSortedString(", ", permission, overwrite, bufferSize,
replication, blockSize));
}
//clear content type
response.setContentType(null);
init(ugi, delegation, path, op, permission, overwrite, bufferSize,
replication, blockSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -193,12 +219,14 @@ public class DatanodeWebHdfsMethods {
public Response postRoot(
final InputStream in,
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return post(in, ugi, ROOT, op, bufferSize);
return post(in, ugi, delegation, ROOT, op, bufferSize);
}
/** Handle HTTP POST request. */
@ -209,6 +237,8 @@ public class DatanodeWebHdfsMethods {
public Response post(
final InputStream in,
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op,
@ -216,13 +246,7 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
+ Param.toSortedString(", ", bufferSize));
}
//clear content type
response.setContentType(null);
init(ugi, delegation, path, op, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -265,6 +289,8 @@ public class DatanodeWebHdfsMethods {
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response getRoot(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@ -274,7 +300,7 @@ public class DatanodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
return get(ugi, ROOT, op, offset, length, bufferSize);
return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
}
/** Handle HTTP GET request. */
@ -283,6 +309,8 @@ public class DatanodeWebHdfsMethods {
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response get(
@Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op,
@ -294,13 +322,7 @@ public class DatanodeWebHdfsMethods {
final BufferSizeParam bufferSize
) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(op + ": " + path + ", ugi=" + ugi
+ Param.toSortedString(", ", offset, length, bufferSize));
}
//clear content type
response.setContentType(null);
init(ugi, delegation, path, op, offset, length, bufferSize);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override

View File

@ -154,7 +154,7 @@ public class NamenodeWebHdfsMethods {
namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
t.setKind(WebHdfsFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(t, namenode.getNameNodeAddress());
SecurityUtil.setTokenService(t, namenode.getHttpAddress());
return t;
}

View File

@ -114,17 +114,24 @@ public class WebHdfsFileSystem extends FileSystem
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
/** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */
public static final AbstractDelegationTokenSelector<DelegationTokenIdentifier> DT_SELECTOR
= new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(TOKEN_KIND) {};
private static final DelegationTokenRenewer<WebHdfsFileSystem> dtRenewer
= new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
static {
dtRenewer.start();
private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (DT_RENEWER == null) {
DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
DT_RENEWER.start();
}
DT_RENEWER.addRenewAction(webhdfs);
}
private final UserGroupInformation ugi;
private InetSocketAddress nnAddr;
private Token<?> delegationToken;
private Token<?> renewToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private Path workingDir;
@ -153,8 +160,7 @@ public class WebHdfsFileSystem extends FileSystem
protected void initDelegationToken() throws IOException {
// look for webhdfs token, then try hdfs
final Text serviceName = SecurityUtil.buildTokenService(nnAddr);
Token<?> token = webhdfspTokenSelector.selectToken(
serviceName, ugi.getTokens());
Token<?> token = DT_SELECTOR.selectToken(serviceName, ugi.getTokens());
if (token == null) {
token = DelegationTokenSelector.selectHdfsDelegationToken(
nnAddr, ugi, getConf());
@ -171,7 +177,7 @@ public class WebHdfsFileSystem extends FileSystem
if (token != null) {
setDelegationToken(token);
if (createdToken) {
dtRenewer.addRenewAction(this);
addRenewAction(this);
LOG.debug("Created new DT for " + token.getService());
} else {
LOG.debug("Found existing DT for " + token.getService());
@ -652,23 +658,14 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public Token<?> getRenewToken() {
return renewToken;
return delegationToken;
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(
final Token<T> token) {
synchronized(this) {
renewToken = token;
// emulate the 203 usage of the tokens
// by setting the kind and service as if they were hdfs tokens
delegationToken = new Token<T>(token);
// NOTE: the remote nn must be configured to use hdfs
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
// no need to change service because we aren't exactly sure what it
// should be. we can guess, but it might be wrong if the local conf
// value is incorrect. the service is a client side field, so the remote
// end does not care about the value
delegationToken = token;
}
}
@ -728,15 +725,6 @@ public class WebHdfsFileSystem extends FileSystem
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
}
private static final DtSelector webhdfspTokenSelector = new DtSelector();
private static class DtSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
private DtSelector() {
super(TOKEN_KIND);
}
}
/** Delegation token renewer. */
public static class DtRenewer extends TokenRenewer {
@Override

View File

@ -53,7 +53,7 @@ public class UserProvider
return JspHelper.getUGI(servletcontext, request, conf,
AuthenticationMethod.KERBEROS, false);
} catch (IOException e) {
throw new RuntimeException(e);
throw new SecurityException("Failed to obtain user group information.", e);
}
}

View File

@ -75,7 +75,7 @@ public class TestWebHdfsUrl {
+ "&token=" + tokenString, renewTokenUrl.getQuery());
Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(
token);
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
delegationToken.setKind(WebHdfsFileSystem.TOKEN_KIND);
Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.CANCELDELEGATIONTOKEN,
ugi.getUserName())